You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:42 UTC
[11/13] Rename client package from kafka.* to org.apache.kafka.*
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Metrics.java b/clients/src/main/java/kafka/common/metrics/Metrics.java
deleted file mode 100644
index f2cb782..0000000
--- a/clients/src/main/java/kafka/common/metrics/Metrics.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import kafka.common.utils.SystemTime;
-import kafka.common.utils.Time;
-import kafka.common.utils.Utils;
-
-/**
- * A registry of sensors and metrics.
- * <p>
- * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
- * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
- * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
- * that are recorded by the sensor.
- * <p>
- * Usage looks something like this:
- *
- * <pre>
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- * sensor.add("kafka.producer.message-sizes.avg", new Avg());
- * sensor.add("kafka.producer.message-sizes.max", new Max());
- *
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * </pre>
- */
-public class Metrics {
-
- private final MetricConfig config;
- private final ConcurrentMap<String, KafkaMetric> metrics;
- private final ConcurrentMap<String, Sensor> sensors;
- private final List<MetricsReporter> reporters;
- private final Time time;
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics() {
- this(new MetricConfig());
- }
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics(Time time) {
- this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
- }
-
- /**
- * Create a metrics repository with no reporters and the given default config. This config will be used for any
- * metric that doesn't override its own config.
- * @param defaultConfig The default config to use for all metrics that don't override their config
- */
- public Metrics(MetricConfig defaultConfig) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
- }
-
- /**
- * Create a metrics repository with a default config and the given metric reporters
- * @param defaultConfig The default config
- * @param reporters The metrics reporters
- * @param time The time instance to use with the metrics
- */
- public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
- this.config = defaultConfig;
- this.sensors = new ConcurrentHashMap<String, Sensor>();
- this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
- this.reporters = Utils.notNull(reporters);
- this.time = time;
- for (MetricsReporter reporter : reporters)
- reporter.init(new ArrayList<KafkaMetric>());
- }
-
- /**
- * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
- * value recorded with this sensor.
- * @param name The name of the sensor
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public Sensor sensor(String name, Sensor... parents) {
- return sensor(name, null, parents);
- }
-
- /**
- * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
- * value recorded with this sensor.
- * @param name The name of the sensor
- * @param config A default configuration to use for this sensor for metrics that don't have their own config
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
- Sensor s = this.sensors.get(Utils.notNull(name));
- if (s == null) {
- s = new Sensor(this, name, parents, config == null ? this.config : config, time);
- this.sensors.put(name, s);
- }
- return s;
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param name The name of the metric
- * @param measurable The measurable that will be measured by this metric
- */
- public void addMetric(String name, Measurable measurable) {
- addMetric(name, "", measurable);
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param name The name of the metric
- * @param description A human-readable description to include in the metric
- * @param measurable The measurable that will be measured by this metric
- */
- public void addMetric(String name, String description, Measurable measurable) {
- addMetric(name, description, null, measurable);
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param name The name of the metric
- * @param config The configuration to use when measuring this measurable
- * @param measurable The measurable that will be measured by this metric
- */
- public void addMetric(String name, MetricConfig config, Measurable measurable) {
- addMetric(name, "", config, measurable);
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param name The name of the metric
- * @param description A human-readable description to include in the metric
- * @param config The configuration to use when measuring this measurable
- * @param measurable The measurable that will be measured by this metric
- */
- public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
- KafkaMetric m = new KafkaMetric(new Object(),
- Utils.notNull(name),
- Utils.notNull(description),
- Utils.notNull(measurable),
- config == null ? this.config : config,
- time);
- registerMetric(m);
- }
-
- /**
- * Add a MetricReporter
- */
- public synchronized void addReporter(MetricsReporter reporter) {
- Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
- this.reporters.add(reporter);
- }
-
- synchronized void registerMetric(KafkaMetric metric) {
- if (this.metrics.containsKey(metric.name()))
- throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
- this.metrics.put(metric.name(), metric);
- for (MetricsReporter reporter : reporters)
- reporter.metricChange(metric);
- }
-
- /**
- * Get all the metrics currently maintained indexed by metric name
- */
- public Map<String, KafkaMetric> metrics() {
- return this.metrics;
- }
-
- /**
- * Close this metrics repository.
- */
- public void close() {
- for (MetricsReporter reporter : this.reporters)
- reporter.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
deleted file mode 100644
index bf0b39e..0000000
--- a/clients/src/main/java/kafka/common/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.List;
-
-/**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported
- */
-public interface MetricsReporter {
-
- /**
- * This is called when the reporter is first registered to initially register all existing metrics
- * @param metrics All currently existing metrics
- */
- public void init(List<KafkaMetric> metrics);
-
- /**
- * This is called whenever a metric is updated or added
- * @param metric
- */
- public void metricChange(KafkaMetric metric);
-
- /**
- * Called when the metrics repository is closed.
- */
- public void close();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Quota.java b/clients/src/main/java/kafka/common/metrics/Quota.java
deleted file mode 100644
index 6278246..0000000
--- a/clients/src/main/java/kafka/common/metrics/Quota.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * An upper or lower bound for metrics
- */
-public final class Quota {
-
- private final boolean upper;
- private final double bound;
-
- public Quota(double bound, boolean upper) {
- this.bound = bound;
- this.upper = upper;
- }
-
- public static Quota lessThan(double upperBound) {
- return new Quota(upperBound, true);
- }
-
- public static Quota moreThan(double lowerBound) {
- return new Quota(lowerBound, false);
- }
-
- public boolean isUpperBound() {
- return this.upper;
- }
-
- public double bound() {
- return this.bound;
- }
-
- public boolean acceptable(double value) {
- return (upper && value <= bound) || (!upper && value >= bound);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
deleted file mode 100644
index b9005cd..0000000
--- a/clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package kafka.common.metrics;
-
-import kafka.common.KafkaException;
-
-/**
- * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
- */
-public class QuotaViolationException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public QuotaViolationException(String m) {
- super(m);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Sensor.java b/clients/src/main/java/kafka/common/metrics/Sensor.java
deleted file mode 100644
index 9c11835..0000000
--- a/clients/src/main/java/kafka/common/metrics/Sensor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import kafka.common.metrics.CompoundStat.NamedMeasurable;
-import kafka.common.utils.Time;
-import kafka.common.utils.Utils;
-
-/**
- * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
- * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
- * of metrics about request sizes such as the average or max.
- */
-public final class Sensor {
-
- private final Metrics registry;
- private final String name;
- private final Sensor[] parents;
- private final List<Stat> stats;
- private final List<KafkaMetric> metrics;
- private final MetricConfig config;
- private final Time time;
-
- Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
- super();
- this.registry = registry;
- this.name = Utils.notNull(name);
- this.parents = parents;
- this.metrics = new ArrayList<KafkaMetric>();
- this.stats = new ArrayList<Stat>();
- this.config = config;
- this.time = time;
- checkForest(new HashSet<Sensor>());
- }
-
- /* Validate that this sensor doesn't end up referencing itself */
- private void checkForest(Set<Sensor> sensors) {
- if (!sensors.add(this))
- throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
- for (int i = 0; i < parents.length; i++)
- parents[i].checkForest(sensors);
- }
-
- /**
- * The name this sensor is registered with. This name will be unique among all registered sensors.
- */
- public String name() {
- return this.name;
- }
-
- /**
- * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
- */
- public void record() {
- record(1.0);
- }
-
- /**
- * Record a value with this sensor
- * @param value The value to record
- * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
- * bound
- */
- public void record(double value) {
- record(value, time.nanoseconds());
- }
-
- private void record(double value, long time) {
- synchronized (this) {
- // increment all the stats
- for (int i = 0; i < this.stats.size(); i++)
- this.stats.get(i).record(config, value, time);
- checkQuotas(time);
-
- }
- for (int i = 0; i < parents.length; i++)
- parents[i].record(value, time);
- }
-
- private void checkQuotas(long time) {
- for (int i = 0; i < this.metrics.size(); i++) {
- KafkaMetric metric = this.metrics.get(i);
- MetricConfig config = metric.config();
- if (config != null) {
- Quota quota = config.quota();
- if (quota != null)
- if (!quota.acceptable(metric.value(time)))
- throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
- }
- }
- }
-
- /**
- * Register a compound statistic with this sensor with no config override
- */
- public void add(CompoundStat stat) {
- add(stat, null);
- }
-
- /**
- * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
- * @param stat The stat to register
- * @param config The configuration for this stat. If null then the stat will use the default configuration for this
- * sensor.
- */
- public synchronized void add(CompoundStat stat, MetricConfig config) {
- this.stats.add(Utils.notNull(stat));
- for (NamedMeasurable m : stat.stats()) {
- KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- }
- }
-
- /**
- * Add a metric with default configuration and no description. Equivalent to
- * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
- *
- */
- public void add(String name, MeasurableStat stat) {
- add(name, stat, null);
- }
-
- /**
- * Add a metric with default configuration. Equivalent to
- * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
- *
- */
- public void add(String name, String description, MeasurableStat stat) {
- add(name, description, stat, null);
- }
-
- /**
- * Add a metric to this sensor with no description. Equivalent to
- * {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
- * @param name
- * @param stat
- * @param config
- */
- public void add(String name, MeasurableStat stat, MetricConfig config) {
- add(name, "", stat, config);
- }
-
- /**
- * Register a metric with this sensor
- * @param name The name of the metric
- * @param description A description used when reporting the value
- * @param stat The statistic to keep
- * @param config A special configuration for this metric. If null use the sensor default configuration.
- */
- public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
- KafkaMetric metric = new KafkaMetric(this,
- Utils.notNull(name),
- Utils.notNull(description),
- Utils.notNull(stat),
- config == null ? this.config : config,
- time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- this.stats.add(stat);
- }
-
- synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(this.metrics);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Stat.java b/clients/src/main/java/kafka/common/metrics/Stat.java
deleted file mode 100644
index 8844545..0000000
--- a/clients/src/main/java/kafka/common/metrics/Stat.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
- */
-public interface Stat {
-
- /**
- * Record the given value
- * @param config The configuration to use for this metric
- * @param value The value to record
- * @param time The time this value occurred
- */
- public void record(MetricConfig config, double value, long time);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/kafka/common/metrics/stats/Avg.java
deleted file mode 100644
index b9d3d5d..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Avg.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that maintains a simple average over its samples.
- */
-public class Avg extends SampledStat {
-
- public Avg() {
- super(0.0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- long count = 0;
- for (int i = 0; i < samples.size(); i++) {
- Sample s = samples.get(i);
- total += s.value;
- count += s.eventCount;
- }
- return total / count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Count.java b/clients/src/main/java/kafka/common/metrics/stats/Count.java
deleted file mode 100644
index 3712e78..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Count.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that maintains a simple count of what it has seen.
- */
-public class Count extends SampledStat {
-
- public Count() {
- super(0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += 1.0;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
deleted file mode 100644
index 9922571..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Histogram.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package kafka.common.metrics.stats;
-
-public class Histogram {
-
- private final BinScheme binScheme;
- private final float[] hist;
- private double count;
-
- public Histogram(BinScheme binScheme) {
- this.hist = new float[binScheme.bins()];
- this.count = 0.0f;
- this.binScheme = binScheme;
- }
-
- public void record(double value) {
- this.hist[binScheme.toBin(value)] += 1.0f;
- this.count += 1.0f;
- }
-
- public double value(double quantile) {
- if (count == 0.0d)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int i = 0; i < this.hist.length - 1; i++) {
- sum += this.hist[i];
- if (sum / count > quant)
- return binScheme.fromBin(i);
- }
- return Float.POSITIVE_INFINITY;
- }
-
- public float[] counts() {
- return this.hist;
- }
-
- public void clear() {
- for (int i = 0; i < this.hist.length; i++)
- this.hist[i] = 0.0f;
- this.count = 0;
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder('{');
- for (int i = 0; i < this.hist.length - 1; i++) {
- b.append(String.format("%.10f", binScheme.fromBin(i)));
- b.append(':');
- b.append(String.format("%.0f", this.hist[i]));
- b.append(',');
- }
- b.append(Float.POSITIVE_INFINITY);
- b.append(':');
- b.append(this.hist[this.hist.length - 1]);
- b.append('}');
- return b.toString();
- }
-
- public interface BinScheme {
- public int bins();
-
- public int toBin(double value);
-
- public double fromBin(int bin);
- }
-
- public static class ConstantBinScheme implements BinScheme {
- private final double min;
- private final double max;
- private final int bins;
- private final double bucketWidth;
-
- public ConstantBinScheme(int bins, double min, double max) {
- if (bins < 2)
- throw new IllegalArgumentException("Must have at least 2 bins.");
- this.min = min;
- this.max = max;
- this.bins = bins;
- this.bucketWidth = (max - min) / (bins - 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == 0)
- return Double.NEGATIVE_INFINITY;
- else if (b == bins - 1)
- return Double.POSITIVE_INFINITY;
- else
- return min + (b - 1) * bucketWidth;
- }
-
- public int toBin(double x) {
- if (x < min)
- return 0;
- else if (x > max)
- return bins - 1;
- else
- return (int) ((x - min) / bucketWidth) + 1;
- }
- }
-
- public static class LinearBinScheme implements BinScheme {
- private final int bins;
- private final double max;
- private final double scale;
-
- public LinearBinScheme(int numBins, double max) {
- this.bins = numBins;
- this.max = max;
- this.scale = max / (numBins * (numBins - 1) / 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == this.bins - 1) {
- return Float.POSITIVE_INFINITY;
- } else {
- double unscaled = (b * (b + 1.0)) / 2.0;
- return unscaled * this.scale;
- }
- }
-
- public int toBin(double x) {
- if (x < 0.0d) {
- throw new IllegalArgumentException("Values less than 0.0 not accepted.");
- } else if (x > this.max) {
- return this.bins - 1;
- } else {
- double scaled = x / this.scale;
- return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Max.java b/clients/src/main/java/kafka/common/metrics/stats/Max.java
deleted file mode 100644
index e7bd1d2..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Max.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that gives the max over its samples.
- */
-public final class Max extends SampledStat {
-
- public Max() {
- super(Double.NEGATIVE_INFINITY);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.max(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.NEGATIVE_INFINITY;
- for (int i = 0; i < samples.size(); i++)
- max = Math.max(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Min.java b/clients/src/main/java/kafka/common/metrics/stats/Min.java
deleted file mode 100644
index db0ab92..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Min.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A {@link SampledStat} that gives the min over its samples.
- */
-public class Min extends SampledStat {
-
- public Min() {
- super(Double.MIN_VALUE);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.min(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.MAX_VALUE;
- for (int i = 0; i < samples.size(); i++)
- max = Math.min(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java b/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
deleted file mode 100644
index 84320bb..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Percentile.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package kafka.common.metrics.stats;
-
-public class Percentile {
-
- private final String name;
- private final String description;
- private final double percentile;
-
- public Percentile(String name, double percentile) {
- this(name, "", percentile);
- }
-
- public Percentile(String name, String description, double percentile) {
- super();
- this.name = name;
- this.description = description;
- this.percentile = percentile;
- }
-
- public String name() {
- return this.name;
- }
-
- public String description() {
- return this.description;
- }
-
- public double percentile() {
- return this.percentile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
deleted file mode 100644
index c3f8942..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.metrics.CompoundStat;
-import kafka.common.metrics.Measurable;
-import kafka.common.metrics.MetricConfig;
-import kafka.common.metrics.stats.Histogram.BinScheme;
-import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
-import kafka.common.metrics.stats.Histogram.LinearBinScheme;
-
-/**
- * A compound stat that reports one or more percentiles
- */
-public class Percentiles extends SampledStat implements CompoundStat {
-
- public static enum BucketSizing {
- CONSTANT, LINEAR
- }
-
- private final int buckets;
- private final Percentile[] percentiles;
- private final BinScheme binScheme;
-
- public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
- this(sizeInBytes, 0.0, max, bucketing, percentiles);
- }
-
- public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
- super(0.0);
- this.percentiles = percentiles;
- this.buckets = sizeInBytes / 4;
- if (bucketing == BucketSizing.CONSTANT) {
- this.binScheme = new ConstantBinScheme(buckets, min, max);
- } else if (bucketing == BucketSizing.LINEAR) {
- if (min != 0.0d)
- throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
- this.binScheme = new LinearBinScheme(buckets, max);
- } else {
- throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
- }
- }
-
- @Override
- public List<NamedMeasurable> stats() {
- List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
- for (Percentile percentile : this.percentiles) {
- final double pct = percentile.percentile();
- ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return value(config, now, pct / 100.0);
- }
- }));
- }
- return ms;
- }
-
- public double value(MetricConfig config, long now, double quantile) {
- timeoutObsoleteSamples(config, now);
- float count = 0.0f;
- for (Sample sample : this.samples)
- count += sample.eventCount;
- if (count == 0.0f)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int b = 0; b < buckets; b++) {
- for (int s = 0; s < this.samples.size(); s++) {
- HistogramSample sample = (HistogramSample) this.samples.get(s);
- float[] hist = sample.histogram.counts();
- sum += hist[b];
- if (sum / count > quant)
- return binScheme.fromBin(b);
- }
- }
- return Double.POSITIVE_INFINITY;
- }
-
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- return value(config, now, 0.5);
- }
-
- @Override
- protected HistogramSample newSample(long now) {
- return new HistogramSample(this.binScheme, now);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- HistogramSample hist = (HistogramSample) sample;
- hist.histogram.record(value);
- }
-
- private static class HistogramSample extends SampledStat.Sample {
- private final Histogram histogram;
-
- private HistogramSample(BinScheme scheme, long now) {
- super(0.0, now);
- this.histogram = new Histogram(scheme);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/kafka/common/metrics/stats/Rate.java
deleted file mode 100644
index 3f24a92..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Rate.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
- * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
- * other such values.
- */
-public class Rate implements MeasurableStat {
-
- private final TimeUnit unit;
- private final SampledStat stat;
-
- public Rate(TimeUnit unit) {
- this(unit, new SampledTotal());
- }
-
- public Rate(TimeUnit unit, SampledStat stat) {
- this.stat = stat;
- this.unit = unit;
- }
-
- public String unitName() {
- return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
- }
-
- @Override
- public void record(MetricConfig config, double value, long time) {
- this.stat.record(config, value, time);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- double ellapsed = convert(now - stat.oldest().lastWindow);
- return stat.measure(config, now) / ellapsed;
- }
-
- private double convert(long time) {
- switch (unit) {
- case NANOSECONDS:
- return time;
- case MICROSECONDS:
- return time / 1000.0;
- case MILLISECONDS:
- return time / (1000.0 * 1000.0);
- case SECONDS:
- return time / (1000.0 * 1000.0 * 1000.0);
- case MINUTES:
- return time / (60.0 * 1000.0 * 1000.0 * 1000.0);
- case HOURS:
- return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
- case DAYS:
- return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0);
- default:
- throw new IllegalStateException("Unknown unit: " + unit);
- }
- }
-
- public static class SampledTotal extends SampledStat {
-
- public SampledTotal() {
- super(0.0d);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
deleted file mode 100644
index e696af5..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package kafka.common.metrics.stats;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
- * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
- * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
- * <p>
- * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
- * recycled to begin recording the next sample.
- *
- * Subclasses of this class define different statistics measured using this basic pattern.
- */
-public abstract class SampledStat implements MeasurableStat {
-
- private double initialValue;
- private int current = 0;
- protected List<Sample> samples;
-
- public SampledStat(double initialValue) {
- this.initialValue = initialValue;
- this.samples = new ArrayList<Sample>(2);
- }
-
- @Override
- public void record(MetricConfig config, double value, long now) {
- Sample sample = current(now);
- if (sample.isComplete(now, config))
- sample = advance(config, now);
- update(sample, config, value, now);
- sample.eventCount += 1;
- }
-
- private Sample advance(MetricConfig config, long now) {
- this.current = (this.current + 1) % config.samples();
- if (this.current >= samples.size()) {
- Sample sample = newSample(now);
- this.samples.add(sample);
- return sample;
- } else {
- Sample sample = current(now);
- sample.reset(now);
- return sample;
- }
- }
-
- protected Sample newSample(long now) {
- return new Sample(this.initialValue, now);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- timeoutObsoleteSamples(config, now);
- return combine(this.samples, config, now);
- }
-
- public Sample current(long now) {
- if (samples.size() == 0)
- this.samples.add(newSample(now));
- return this.samples.get(this.current);
- }
-
- public Sample oldest() {
- return this.samples.get((this.current + 1) % this.samples.size());
- }
-
- protected abstract void update(Sample sample, MetricConfig config, double value, long now);
-
- public abstract double combine(List<Sample> samples, MetricConfig config, long now);
-
- /* Timeout any windows that have expired in the absense of any events */
- protected void timeoutObsoleteSamples(MetricConfig config, long now) {
- for (int i = 0; i < samples.size(); i++) {
- int idx = (this.current + i) % samples.size();
- Sample sample = this.samples.get(idx);
- if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
- sample.reset(now);
- }
- }
-
- protected static class Sample {
- public double initialValue;
- public long eventCount;
- public long lastWindow;
- public double value;
-
- public Sample(double initialValue, long now) {
- this.initialValue = initialValue;
- this.eventCount = 0;
- this.lastWindow = now;
- this.value = initialValue;
- }
-
- public void reset(long now) {
- this.eventCount = 0;
- this.lastWindow = now;
- this.value = initialValue;
- }
-
- public boolean isComplete(long now, MetricConfig config) {
- return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/stats/Total.java b/clients/src/main/java/kafka/common/metrics/stats/Total.java
deleted file mode 100644
index c87b1ba..0000000
--- a/clients/src/main/java/kafka/common/metrics/stats/Total.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package kafka.common.metrics.stats;
-
-import kafka.common.metrics.MeasurableStat;
-import kafka.common.metrics.MetricConfig;
-
-/**
- * An un-windowed cumulative total maintained over all time.
- */
-public class Total implements MeasurableStat {
-
- private double total;
-
- public Total() {
- this.total = 0.0;
- }
-
- public Total(double value) {
- this.total = value;
- }
-
- @Override
- public void record(MetricConfig config, double value, long time) {
- this.total += value;
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- return this.total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
deleted file mode 100644
index 65a7c64..0000000
--- a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
- private final int source;
- private final ByteBuffer[] buffers;
- private int remaining;
-
- public ByteBufferReceive(int source, ByteBuffer... buffers) {
- super();
- this.source = source;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- }
-
- @Override
- public int source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return remaining > 0;
- }
-
- @Override
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- long read = channel.read(buffers);
- remaining += read;
- return read;
- }
-
- public ByteBuffer[] reify() {
- return buffers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/kafka/common/network/ByteBufferSend.java
deleted file mode 100644
index 43bf963..0000000
--- a/clients/src/main/java/kafka/common/network/ByteBufferSend.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package kafka.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A send backed by an array of byte buffers
- */
-public class ByteBufferSend implements Send {
-
- private final int destination;
- protected final ByteBuffer[] buffers;
- private int remaining;
-
- public ByteBufferSend(int destination, ByteBuffer... buffers) {
- super();
- this.destination = destination;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- }
-
- @Override
- public int destination() {
- return destination;
- }
-
- @Override
- public boolean complete() {
- return remaining > 0;
- }
-
- @Override
- public ByteBuffer[] reify() {
- return this.buffers;
- }
-
- @Override
- public int remaining() {
- return this.remaining;
- }
-
- @Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
- long written = channel.write(buffers);
- if (written < 0)
- throw new EOFException("This shouldn't happen.");
- remaining -= written;
- return written;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkReceive.java b/clients/src/main/java/kafka/common/network/NetworkReceive.java
deleted file mode 100644
index 68ae48e..0000000
--- a/clients/src/main/java/kafka/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package kafka.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
- private final int source;
- private final ByteBuffer size;
- private ByteBuffer buffer;
-
- public NetworkReceive(int source, ByteBuffer buffer) {
- this.source = source;
- this.buffer = buffer;
- this.size = null;
- }
-
- public NetworkReceive(int source) {
- this.source = source;
- this.size = ByteBuffer.allocate(4);
- this.buffer = null;
- }
-
- @Override
- public int source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return !size.hasRemaining() && !buffer.hasRemaining();
- }
-
- @Override
- public ByteBuffer[] reify() {
- return new ByteBuffer[] { this.buffer };
- }
-
- @Override
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- int read = 0;
- if (size.hasRemaining()) {
- int bytesRead = channel.read(size);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- if (!size.hasRemaining()) {
- size.rewind();
- int requestSize = size.getInt();
- if (requestSize < 0)
- throw new IllegalStateException("Invalid request (size = " + requestSize + ")");
- this.buffer = ByteBuffer.allocate(requestSize);
- }
- }
- if (buffer != null) {
- int bytesRead = channel.read(buffer);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- }
-
- return read;
- }
-
- public ByteBuffer payload() {
- return this.buffer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/NetworkSend.java b/clients/src/main/java/kafka/common/network/NetworkSend.java
deleted file mode 100644
index 4e4ac98..0000000
--- a/clients/src/main/java/kafka/common/network/NetworkSend.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package kafka.common.network;
-
-import java.nio.ByteBuffer;
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
- public NetworkSend(int destination, ByteBuffer... buffers) {
- super(destination, sizeDelimit(buffers));
- }
-
- private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
- int size = 0;
- for (int i = 0; i < buffers.length; i++)
- size += buffers[i].remaining();
- ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
- delimited[0] = ByteBuffer.allocate(4);
- delimited[0].putInt(size);
- delimited[0].rewind();
- System.arraycopy(buffers, 0, delimited, 1, buffers.length);
- return delimited;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Receive.java b/clients/src/main/java/kafka/common/network/Receive.java
deleted file mode 100644
index 40ee942..0000000
--- a/clients/src/main/java/kafka/common/network/Receive.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
- /**
- * The numeric id of the source from which we are receiving data.
- */
- public int source();
-
- /**
- * Are we done receiving data?
- */
- public boolean complete();
-
- /**
- * Turn this receive into ByteBuffer instances, if possible (otherwise returns null).
- */
- public ByteBuffer[] reify();
-
- /**
- * Read bytes into this receive from the given channel
- * @param channel The channel to read from
- * @return The number of bytes read
- * @throws IOException If the reading fails
- */
- public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selectable.java b/clients/src/main/java/kafka/common/network/Selectable.java
deleted file mode 100644
index 794fc60..0000000
--- a/clients/src/main/java/kafka/common/network/Selectable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
- /**
- * Begin establishing a socket connection to the given address identified by the given address
- * @param id The id for this connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the socket
- * @param receiveBufferSize The receive buffer for the socket
- * @throws IOException If we cannot begin connecting
- */
- public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
- /**
- * Begin disconnecting the connection identified by the given id
- */
- public void disconnect(int id);
-
- /**
- * Wakeup this selector if it is blocked on I/O
- */
- public void wakeup();
-
- /**
- * Close this selector
- */
- public void close();
-
- /**
- * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections,
- * disconnections, existing sends, and receives)
- * @param timeout The amount of time to block if there is nothing to do
- * @param sends The new sends to initiate
- * @throws IOException
- */
- public void poll(long timeout, List<NetworkSend> sends) throws IOException;
-
- /**
- * The list of sends that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
- */
- public List<NetworkSend> completedSends();
-
- /**
- * The list of receives that completed on the last {@link #poll(long, List<NetworkSend>) poll()} call.
- */
- public List<NetworkReceive> completedReceives();
-
- /**
- * The list of connections that finished disconnecting on the last {@link #poll(long, List<NetworkSend>) poll()}
- * call.
- */
- public List<Integer> disconnected();
-
- /**
- * The list of connections that completed their connection on the last {@link #poll(long, List<NetworkSend>) poll()}
- * call.
- */
- public List<Integer> connected();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Selector.java b/clients/src/main/java/kafka/common/network/Selector.java
deleted file mode 100644
index f53060c..0000000
--- a/clients/src/main/java/kafka/common/network/Selector.java
+++ /dev/null
@@ -1,349 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import kafka.common.KafkaException;
-
-/**
- * A selector interface for doing non-blocking multi-connection network I/O.
- * <p>
- * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
- * responses.
- * <p>
- * A connection can be added to the selector associated with an integer id by doing
- *
- * <pre>
- * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
- * </pre>
- *
- * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
- * the connection. The successful invocation of this method does not mean a valid connection has been established.
- *
- * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
- * connections are all done using the <code>poll()</code> call.
- *
- * <pre>
- * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * selector.poll(TIMEOUT_MS, requestsToSend);
- * </pre>
- *
- * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
- * various getters. These are reset by each call to <code>poll()</code>.
- *
- * This class is not thread safe!
- */
-public class Selector implements Selectable {
-
- private final java.nio.channels.Selector selector;
- private final Map<Integer, SelectionKey> keys;
- private final List<NetworkSend> completedSends;
- private final List<NetworkReceive> completedReceives;
- private final List<Integer> disconnected;
- private final List<Integer> connected;
-
- /**
- * Create a new selector
- */
- public Selector() {
- try {
- this.selector = java.nio.channels.Selector.open();
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- this.keys = new HashMap<Integer, SelectionKey>();
- this.completedSends = new ArrayList<NetworkSend>();
- this.completedReceives = new ArrayList<NetworkReceive>();
- this.connected = new ArrayList<Integer>();
- this.disconnected = new ArrayList<Integer>();
- }
-
- /**
- * Begin connecting to the given address and add the connection to this selector associated with the given id
- * number.
- * <p>
- * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
- * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
- * @param id The id for the new connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the new connection
- * @param receiveBufferSize The receive buffer for the new connection
- * @throws IllegalStateException if there is already a connection for that id
- * @throws UnresolvedAddressException if DNS resolution fails on the hostname
- */
- @Override
- public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
- SocketChannel channel = SocketChannel.open();
- channel.configureBlocking(false);
- Socket socket = channel.socket();
- socket.setKeepAlive(true);
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
- socket.setTcpNoDelay(true);
- try {
- channel.connect(address);
- } catch (UnresolvedAddressException e) {
- channel.close();
- throw e;
- }
- SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
- key.attach(new Transmissions(id));
- if (this.keys.containsKey(key))
- throw new IllegalStateException("There is already a connection for id " + id);
- this.keys.put(id, key);
- }
-
- /**
- * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
- * processed until the next {@link #poll(long, List) poll()} call.
- */
- @Override
- public void disconnect(int id) {
- SelectionKey key = this.keys.get(id);
- if (key != null)
- key.cancel();
- }
-
- /**
- * Interrupt the selector if it is blocked waiting to do I/O.
- */
- @Override
- public void wakeup() {
- this.selector.wakeup();
- }
-
- /**
- * Close this selector and all associated connections
- */
- @Override
- public void close() {
- for (SelectionKey key : this.selector.keys()) {
- try {
- close(key);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- try {
- this.selector.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
- * disconnections, initiating new sends, or making progress on in-progress sends or receives.
- * <p>
- * The provided network sends will be started.
- *
- * When this call is completed the user can check for completed sends, receives, connections or disconnects using
- * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
- * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
- * completed I/O.
- *
- * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
- * @param sends The list of new sends to begin
- *
- * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
- * already an in-progress send
- */
- @Override
- public void poll(long timeout, List<NetworkSend> sends) throws IOException {
- clear();
-
- /* register for write interest on any new sends */
- for (NetworkSend send : sends) {
- SelectionKey key = keyForId(send.destination());
- Transmissions transmissions = transmissions(key);
- if (transmissions.hasSend())
- throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
- transmissions.send = send;
- try {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } catch (CancelledKeyException e) {
- close(key);
- }
- }
-
- /* check ready keys */
- int readyKeys = select(timeout);
- if (readyKeys > 0) {
- Set<SelectionKey> keys = this.selector.selectedKeys();
- Iterator<SelectionKey> iter = keys.iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
-
- Transmissions transmissions = transmissions(key);
- SocketChannel channel = channel(key);
- try {
- /*
- * complete any connections that have finished their handshake
- */
- if (key.isConnectable()) {
- channel.finishConnect();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- this.connected.add(transmissions.id);
- }
-
- /* read from any connections that have readable data */
- if (key.isReadable()) {
- if (!transmissions.hasReceive())
- transmissions.receive = new NetworkReceive(transmissions.id);
- transmissions.receive.readFrom(channel);
- if (transmissions.receive.complete()) {
- transmissions.receive.payload().rewind();
- this.completedReceives.add(transmissions.receive);
- transmissions.clearReceive();
- }
- }
-
- /*
- * write to any sockets that have space in their buffer and for which we have data
- */
- if (key.isWritable()) {
- transmissions.send.writeTo(channel);
- if (transmissions.send.remaining() <= 0) {
- this.completedSends.add(transmissions.send);
- transmissions.clearSend();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- }
-
- /* cancel any defunct sockets */
- if (!key.isValid())
- close(key);
- } catch (IOException e) {
- e.printStackTrace();
- close(key);
- }
- }
- }
- }
-
- @Override
- public List<NetworkSend> completedSends() {
- return this.completedSends;
- }
-
- @Override
- public List<NetworkReceive> completedReceives() {
- return this.completedReceives;
- }
-
- @Override
- public List<Integer> disconnected() {
- return this.disconnected;
- }
-
- @Override
- public List<Integer> connected() {
- return this.connected;
- }
-
- /**
- * Clear the results from the prior poll
- */
- private void clear() {
- this.completedSends.clear();
- this.completedReceives.clear();
- this.connected.clear();
- this.disconnected.clear();
- }
-
- /**
- * Check for data, waiting up to the given timeout.
- *
- * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
- * @return The number of keys ready
- * @throws IOException
- */
- private int select(long ms) throws IOException {
- if (ms == 0L)
- return this.selector.selectNow();
- else if (ms < 0L)
- return this.selector.select();
- else
- return this.selector.select(ms);
- }
-
- /**
- * Begin closing this connection
- */
- private void close(SelectionKey key) throws IOException {
- SocketChannel channel = channel(key);
- Transmissions trans = transmissions(key);
- if (trans != null)
- this.disconnected.add(trans.id);
- key.attach(null);
- key.cancel();
- channel.socket().close();
- channel.close();
- }
-
- /**
- * Get the selection key associated with this numeric id
- */
- private SelectionKey keyForId(int id) {
- SelectionKey key = this.keys.get(id);
- if (key == null)
- throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
- return key;
- }
-
- /**
- * Get the transmissions for the given connection
- */
- private Transmissions transmissions(SelectionKey key) {
- return (Transmissions) key.attachment();
- }
-
- /**
- * Get the socket channel associated with this selection key
- */
- private SocketChannel channel(SelectionKey key) {
- return (SocketChannel) key.channel();
- }
-
- /**
- * The id and in-progress send and receive associated with a connection
- */
- private static class Transmissions {
- public int id;
- public NetworkSend send;
- public NetworkReceive receive;
-
- public Transmissions(int id) {
- this.id = id;
- }
-
- public boolean hasSend() {
- return this.send != null;
- }
-
- public void clearSend() {
- this.send = null;
- }
-
- public boolean hasReceive() {
- return this.receive != null;
- }
-
- public void clearReceive() {
- this.receive = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/Send.java b/clients/src/main/java/kafka/common/network/Send.java
deleted file mode 100644
index e7ef68a..0000000
--- a/clients/src/main/java/kafka/common/network/Send.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package kafka.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * This interface models the in-progress sending of data to a destination identified by an integer id.
- */
-public interface Send {
-
- /**
- * The numeric id for the destination of this send
- */
- public int destination();
-
- /**
- * The number of bytes remaining to send
- */
- public int remaining();
-
- /**
- * Is this send complete?
- */
- public boolean complete();
-
- /**
- * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
- */
- public ByteBuffer[] reify();
-
- /**
- * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
- * to be completely written
- * @param channel The channel to write to
- * @return The number of bytes written
- * @throws IOException If the write fails
- */
- public long writeTo(GatheringByteChannel channel) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/kafka/common/protocol/ApiKeys.java
deleted file mode 100644
index 1e2f8bb..0000000
--- a/clients/src/main/java/kafka/common/protocol/ApiKeys.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.common.protocol;
-
-/**
- * Identifiers for all the Kafka APIs
- */
-public enum ApiKeys {
- PRODUCE(0, "produce"),
- FETCH(1, "fetch"),
- LIST_OFFSETS(2, "list_offsets"),
- METADATA(3, "metadata"),
- LEADER_AND_ISR(4, "leader_and_isr"),
- STOP_REPLICA(5, "stop_replica"),
- OFFSET_COMMIT(6, "offset_commit"),
- OFFSET_FETCH(7, "offset_fetch");
-
- public static int MAX_API_KEY = 0;
-
- static {
- for (ApiKeys key : ApiKeys.values()) {
- MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
- }
- }
-
- /** the perminant and immutable id of an API--this can't change ever */
- public final short id;
-
- /** an english description of the api--this is for debugging and can change */
- public final String name;
-
- private ApiKeys(int id, String name) {
- this.id = (short) id;
- this.name = name;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
deleted file mode 100644
index 402a6c0..0000000
--- a/clients/src/main/java/kafka/common/protocol/Errors.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package kafka.common.protocol;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import kafka.common.errors.ApiException;
-import kafka.common.errors.CorruptRecordException;
-import kafka.common.errors.LeaderNotAvailableException;
-import kafka.common.errors.RecordTooLargeException;
-import kafka.common.errors.NetworkException;
-import kafka.common.errors.NotLeaderForPartitionException;
-import kafka.common.errors.OffsetMetadataTooLarge;
-import kafka.common.errors.OffsetOutOfRangeException;
-import kafka.common.errors.TimeoutException;
-import kafka.common.errors.UnknownServerException;
-import kafka.common.errors.UnknownTopicOrPartitionException;
-
-/**
- * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
- * are thus part of the protocol. The names can be changed but the error code cannot.
- *
- * Do not add exceptions that occur only on the client or only on the server here.
- */
-public enum Errors {
- UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
- NONE(0, null),
- OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2,
- new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
- UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- LEADER_NOT_AVAILABLE(5,
- new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
- NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
- REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
- MESSAGE_TOO_LARGE(10,
- new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
- OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
- NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
-
- private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
- private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
- static {
- for (Errors error : Errors.values()) {
- codeToError.put(error.code(), error);
- if (error.exception != null)
- classToError.put(error.exception.getClass(), error);
- }
-
- }
-
- private final short code;
- private final ApiException exception;
-
- private Errors(int code, ApiException exception) {
- this.code = (short) code;
- this.exception = exception;
- }
-
- /**
- * An instance of the exception
- */
- public ApiException exception() {
- return this.exception;
- }
-
- /**
- * The error code for the exception
- */
- public short code() {
- return this.code;
- }
-
- /**
- * Throw the exception corresponding to this error if there is one
- */
- public void maybeThrow() {
- if (exception != null)
- throw this.exception;
- }
-
- /**
- * Throw the exception if there is one
- */
- public static Errors forCode(short code) {
- Errors error = codeToError.get(code);
- return error == null ? UNKNOWN : error;
- }
-
- /**
- * Return the error instance associated with this exception (or UKNOWN if there is none)
- */
- public static Errors forException(Throwable t) {
- Errors error = classToError.get(t.getClass());
- return error == null ? UNKNOWN : error;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
deleted file mode 100644
index 576c24d..0000000
--- a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package kafka.common.protocol;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
-import kafka.common.protocol.types.Schema;
-import kafka.common.protocol.types.Struct;
-
-public class ProtoUtils {
-
- private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
- if (apiKey < 0 || apiKey > schemas.length)
- throw new IllegalArgumentException("Invalid api key: " + apiKey);
- Schema[] versions = schemas[apiKey];
- if (version < 0 || version > versions.length)
- throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
- return versions[version];
- }
-
- public static short latestVersion(int apiKey) {
- if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
- throw new IllegalArgumentException("Invalid api key: " + apiKey);
- return Protocol.CURR_VERSION[apiKey];
- }
-
- public static Schema requestSchema(int apiKey, int version) {
- return schemaFor(Protocol.REQUESTS, apiKey, version);
- }
-
- public static Schema currentRequestSchema(int apiKey) {
- return requestSchema(apiKey, latestVersion(apiKey));
- }
-
- public static Schema responseSchema(int apiKey, int version) {
- return schemaFor(Protocol.RESPONSES, apiKey, version);
- }
-
- public static Schema currentResponseSchema(int apiKey) {
- return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
- }
-
- public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
- return (Struct) requestSchema(apiKey, version).read(buffer);
- }
-
- public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
- return (Struct) currentResponseSchema(apiKey).read(buffer);
- }
-
- public static Cluster parseMetadataResponse(Struct response) {
- Map<Integer, Node> brokers = new HashMap<Integer, Node>();
- Object[] brokerStructs = (Object[]) response.get("brokers");
- for (int i = 0; i < brokerStructs.length; i++) {
- Struct broker = (Struct) brokerStructs[i];
- int nodeId = (Integer) broker.get("node_id");
- String host = (String) broker.get("host");
- int port = (Integer) broker.get("port");
- brokers.put(nodeId, new Node(nodeId, host, port));
- }
- List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
- Object[] topicInfos = (Object[]) response.get("topic_metadata");
- for (int i = 0; i < topicInfos.length; i++) {
- Struct topicInfo = (Struct) topicInfos[i];
- short topicError = topicInfo.getShort("topic_error_code");
- if (topicError == Errors.NONE.code()) {
- String topic = topicInfo.getString("topic");
- Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
- for (int j = 0; j < partitionInfos.length; j++) {
- Struct partitionInfo = (Struct) partitionInfos[j];
- short partError = partitionInfo.getShort("partition_error_code");
- if (partError == Errors.NONE.code()) {
- int partition = partitionInfo.getInt("partition_id");
- int leader = partitionInfo.getInt("leader");
- Node leaderNode = leader == -1 ? null : brokers.get(leader);
- Object[] replicas = (Object[]) partitionInfo.get("replicas");
- Node[] replicaNodes = new Node[replicas.length];
- for (int k = 0; k < replicas.length; k++)
- replicaNodes[k] = brokers.get(replicas[k]);
- Object[] isr = (Object[]) partitionInfo.get("isr");
- Node[] isrNodes = new Node[isr.length];
- for (int k = 0; k < isr.length; k++)
- isrNodes[k] = brokers.get(isr[k]);
- partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
- }
- }
- }
- }
- return new Cluster(brokers.values(), partitions);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
deleted file mode 100644
index 49b60aa..0000000
--- a/clients/src/main/java/kafka/common/protocol/Protocol.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package kafka.common.protocol;
-
-import static kafka.common.protocol.types.Type.BYTES;
-import static kafka.common.protocol.types.Type.INT16;
-import static kafka.common.protocol.types.Type.INT32;
-import static kafka.common.protocol.types.Type.INT64;
-import static kafka.common.protocol.types.Type.STRING;
-import kafka.common.protocol.types.ArrayOf;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Schema;
-
-public class Protocol {
-
- public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
- new Field("api_version", INT16, "The version of the API."),
- new Field("correlation_id",
- INT32,
- "A user-supplied integer value that will be passed back with the response"),
- new Field("client_id",
- STRING,
- "A user specified identifier for the client making the request."));
-
- public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
- INT32,
- "The user-supplied value passed in with the request"));
-
- /* Metadata api */
-
- public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
-
- public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32, "The port on which the broker accepts requests."));
-
- public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
- INT16,
- "The error code for the partition, if any."),
- new Field("partition_id", INT32, "The id of the partition."),
- new Field("leader",
- INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(INT32),
- "The set of nodes that are in sync with the leader for this partition."));
-
- public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("partition_metadata",
- new ArrayOf(PARTITION_METADATA_V0),
- "Metadata for each partition of the topic."));
-
- public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(BROKER),
- "Host and port information for all brokers."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
-
- public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
- public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
-
- /* Produce api */
-
- public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
- new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("record_set", BYTES)))));
-
- public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
- INT16,
- "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
- new Field("timeout", INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64))))))));
-
- public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
- public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
-
- /* an array of all requests and responses with all schema versions */
- public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
- public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
-
- /* the latest version of each api */
- public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
-
- static {
- REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
- REQUESTS[ApiKeys.FETCH.id] = new Schema[] {};
- REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
- REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
- REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
- REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
-
- RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
- RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
- RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {};
- RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
- RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
- RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
-
- /* set the maximum version of each api */
- for (ApiKeys api : ApiKeys.values())
- CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
-
- /* sanity check that we have the same number of request and response versions for each api */
- for (ApiKeys api : ApiKeys.values())
- if (REQUESTS[api.id].length != RESPONSES[api.id].length)
- throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
- + api.name
- + " but "
- + RESPONSES[api.id].length
- + " response versions.");
- }
-
-}