You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:01 UTC
[44/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MeasurableStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MeasurableStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MeasurableStat.java
deleted file mode 100644
index a3ee306..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MeasurableStat.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.metrics.stats.Count;
-import org.apache.flink.kafka_backport.common.metrics.stats.Max;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
- * This is the interface used for most of the simple statistics such as {@link org.apache.flink.kafka_backport.common.metrics.stats.Avg},
- * {@link Max}, {@link Count}, etc.
- */
-public interface MeasurableStat extends Stat, Measurable {
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricConfig.java
deleted file mode 100644
index f95824a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricConfig.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import java.util.concurrent.TimeUnit;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Configuration values for metrics
- */
-public class MetricConfig {
-
- private Quota quota;
- private int samples;
- private long eventWindow;
- private long timeWindowMs;
- private TimeUnit unit;
-
- public MetricConfig() {
- super();
- this.quota = null;
- this.samples = 2;
- this.eventWindow = Long.MAX_VALUE;
- this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
- this.unit = TimeUnit.SECONDS;
- }
-
- public Quota quota() {
- return this.quota;
- }
-
- public MetricConfig quota(Quota quota) {
- this.quota = quota;
- return this;
- }
-
- public long eventWindow() {
- return eventWindow;
- }
-
- public MetricConfig eventWindow(long window) {
- this.eventWindow = window;
- return this;
- }
-
- public long timeWindowMs() {
- return timeWindowMs;
- }
-
- public MetricConfig timeWindow(long window, TimeUnit unit) {
- this.timeWindowMs = TimeUnit.MILLISECONDS.convert(window, unit);
- return this;
- }
-
- public int samples() {
- return this.samples;
- }
-
- public MetricConfig samples(int samples) {
- if (samples < 1)
- throw new IllegalArgumentException("The number of samples must be at least 1.");
- this.samples = samples;
- return this;
- }
-
- public TimeUnit timeUnit() {
- return unit;
- }
-
- public MetricConfig timeUnit(TimeUnit unit) {
- this.unit = unit;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Metrics.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Metrics.java
deleted file mode 100644
index ea7e5f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Metrics.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.utils.SystemTime;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.utils.CopyOnWriteMap;
-import org.apache.flink.kafka_backport.common.utils.Time;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A registry of sensors and metrics.
- * <p>
- * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
- * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
- * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
- * that are recorded by the sensor.
- * <p>
- * Usage looks something like this:
- *
- * <pre>
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics");
- * sensor.add(metricName, new Avg());
- * metricName = new MetricName("message-size-max", "producer-metrics");
- * sensor.add(metricName, new Max());
- *
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * </pre>
- */
-public class Metrics implements Closeable {
-
- private final MetricConfig config;
- private final ConcurrentMap<MetricName, KafkaMetric> metrics;
- private final ConcurrentMap<String, Sensor> sensors;
- private final List<MetricsReporter> reporters;
- private final Time time;
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics() {
- this(new MetricConfig());
- }
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics(Time time) {
- this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
- }
-
- /**
- * Create a metrics repository with no reporters and the given default config. This config will be used for any
- * metric that doesn't override its own config.
- * @param defaultConfig The default config to use for all metrics that don't override their config
- */
- public Metrics(MetricConfig defaultConfig) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
- }
-
- /**
- * Create a metrics repository with a default config and the given metric reporters
- * @param defaultConfig The default config
- * @param reporters The metrics reporters
- * @param time The time instance to use with the metrics
- */
- public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
- this.config = defaultConfig;
- this.sensors = new CopyOnWriteMap<String, Sensor>();
- this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
- this.reporters = Utils.notNull(reporters);
- this.time = time;
- for (MetricsReporter reporter : reporters)
- reporter.init(new ArrayList<KafkaMetric>());
- }
-
- /**
- * Get the sensor with the given name if it exists
- * @param name The name of the sensor
- * @return Return the sensor or null if no such sensor exists
- */
- public Sensor getSensor(String name) {
- return this.sensors.get(Utils.notNull(name));
- }
-
- /**
- * Get or create a sensor with the given unique name and no parent sensors.
- * @param name The sensor name
- * @return The sensor
- */
- public Sensor sensor(String name) {
- return sensor(name, null, (Sensor[]) null);
- }
-
- /**
- * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
- * receive every value recorded with this sensor.
- * @param name The name of the sensor
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public Sensor sensor(String name, Sensor... parents) {
- return sensor(name, null, parents);
- }
-
- /**
- * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
- * receive every value recorded with this sensor.
- * @param name The name of the sensor
- * @param config A default configuration to use for this sensor for metrics that don't have their own config
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
- Sensor s = getSensor(name);
- if (s == null) {
- s = new Sensor(this, name, parents, config == null ? this.config : config, time);
- this.sensors.put(name, s);
- }
- return s;
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param metricName The name of the metric
- * @param measurable The measurable that will be measured by this metric
- */
- public void addMetric(MetricName metricName, Measurable measurable) {
- addMetric(metricName, null, measurable);
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param metricName The name of the metric
- * @param config The configuration to use when measuring this measurable
- * @param measurable The measurable that will be measured by this metric
- */
- public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
- KafkaMetric m = new KafkaMetric(new Object(),
- Utils.notNull(metricName),
- Utils.notNull(measurable),
- config == null ? this.config : config,
- time);
- registerMetric(m);
- }
-
- /**
- * Add a MetricReporter
- */
- public synchronized void addReporter(MetricsReporter reporter) {
- Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
- this.reporters.add(reporter);
- }
-
- synchronized void registerMetric(KafkaMetric metric) {
- MetricName metricName = metric.metricName();
- if (this.metrics.containsKey(metricName))
- throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
- this.metrics.put(metricName, metric);
- for (MetricsReporter reporter : reporters)
- reporter.metricChange(metric);
- }
-
- /**
- * Get all the metrics currently maintained indexed by metricName
- */
- public Map<MetricName, KafkaMetric> metrics() {
- return this.metrics;
- }
-
- /**
- * Close this metrics repository.
- */
- @Override
- public void close() {
- for (MetricsReporter reporter : this.reporters)
- reporter.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricsReporter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricsReporter.java
deleted file mode 100644
index 870198b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.Configurable;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported.
- */
-public interface MetricsReporter extends Configurable {
-
- /**
- * This is called when the reporter is first registered to initially register all existing metrics
- * @param metrics All currently existing metrics
- */
- public void init(List<KafkaMetric> metrics);
-
- /**
- * This is called whenever a metric is updated or added
- * @param metric
- */
- public void metricChange(KafkaMetric metric);
-
- /**
- * Called when the metrics repository is closed.
- */
- public void close();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Quota.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Quota.java
deleted file mode 100644
index 1de74bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Quota.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An upper or lower bound for metrics
- */
-public final class Quota {
-
- private final boolean upper;
- private final double bound;
-
- public Quota(double bound, boolean upper) {
- this.bound = bound;
- this.upper = upper;
- }
-
- public static Quota lessThan(double upperBound) {
- return new Quota(upperBound, true);
- }
-
- public static Quota moreThan(double lowerBound) {
- return new Quota(lowerBound, false);
- }
-
- public boolean isUpperBound() {
- return this.upper;
- }
-
- public double bound() {
- return this.bound;
- }
-
- public boolean acceptable(double value) {
- return (upper && value <= bound) || (!upper && value >= bound);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/QuotaViolationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/QuotaViolationException.java
deleted file mode 100644
index ee1d073..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/QuotaViolationException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
- */
-public class QuotaViolationException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public QuotaViolationException(String m) {
- super(m);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Sensor.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Sensor.java
deleted file mode 100644
index 16be7ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Sensor.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.utils.Time;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
- * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
- * of metrics about request sizes such as the average or max.
- */
-public final class Sensor {
-
- private final Metrics registry;
- private final String name;
- private final Sensor[] parents;
- private final List<Stat> stats;
- private final List<KafkaMetric> metrics;
- private final MetricConfig config;
- private final Time time;
-
- Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
- super();
- this.registry = registry;
- this.name = Utils.notNull(name);
- this.parents = parents == null ? new Sensor[0] : parents;
- this.metrics = new ArrayList<KafkaMetric>();
- this.stats = new ArrayList<Stat>();
- this.config = config;
- this.time = time;
- checkForest(new HashSet<Sensor>());
- }
-
- /* Validate that this sensor doesn't end up referencing itself */
- private void checkForest(Set<Sensor> sensors) {
- if (!sensors.add(this))
- throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
- for (int i = 0; i < parents.length; i++)
- parents[i].checkForest(sensors);
- }
-
- /**
- * The name this sensor is registered with. This name will be unique among all registered sensors.
- */
- public String name() {
- return this.name;
- }
-
- /**
- * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
- */
- public void record() {
- record(1.0);
- }
-
- /**
- * Record a value with this sensor
- * @param value The value to record
- * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
- * bound
- */
- public void record(double value) {
- record(value, time.milliseconds());
- }
-
- /**
- * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
- * the time stamp.
- * @param value The value we are recording
- * @param timeMs The current POSIX time in milliseconds
- * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
- * bound
- */
- public void record(double value, long timeMs) {
- synchronized (this) {
- // increment all the stats
- for (int i = 0; i < this.stats.size(); i++)
- this.stats.get(i).record(config, value, timeMs);
- checkQuotas(timeMs);
- }
- for (int i = 0; i < parents.length; i++)
- parents[i].record(value, timeMs);
- }
-
- /**
- * Check if we have violated our quota for any metric that has a configured quota
- * @param timeMs
- */
- private void checkQuotas(long timeMs) {
- for (int i = 0; i < this.metrics.size(); i++) {
- KafkaMetric metric = this.metrics.get(i);
- MetricConfig config = metric.config();
- if (config != null) {
- Quota quota = config.quota();
- if (quota != null) {
- if (!quota.acceptable(metric.value(timeMs)))
- throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
- }
- }
- }
- }
-
- /**
- * Register a compound statistic with this sensor with no config override
- */
- public void add(CompoundStat stat) {
- add(stat, null);
- }
-
- /**
- * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
- * @param stat The stat to register
- * @param config The configuration for this stat. If null then the stat will use the default configuration for this
- * sensor.
- */
- public synchronized void add(CompoundStat stat, MetricConfig config) {
- this.stats.add(Utils.notNull(stat));
- for (CompoundStat.NamedMeasurable m : stat.stats()) {
- KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- }
- }
-
- /**
- * Register a metric with this sensor
- * @param metricName The name of the metric
- * @param stat The statistic to keep
- */
- public void add(MetricName metricName, MeasurableStat stat) {
- add(metricName, stat, null);
- }
-
- /**
- * Register a metric with this sensor
- * @param metricName The name of the metric
- * @param stat The statistic to keep
- * @param config A special configuration for this metric. If null use the sensor default configuration.
- */
- public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
- KafkaMetric metric = new KafkaMetric(new Object(),
- Utils.notNull(metricName),
- Utils.notNull(stat),
- config == null ? this.config : config,
- time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- this.stats.add(stat);
- }
-
- synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(this.metrics);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Stat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Stat.java
deleted file mode 100644
index 25ce62b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Stat.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
- */
-public interface Stat {
-
- /**
- * Record the given value
- * @param config The configuration to use for this metric
- * @param value The value to record
- * @param timeMs The POSIX time in milliseconds this value occurred
- */
- public void record(MetricConfig config, double value, long timeMs);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Avg.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Avg.java
deleted file mode 100644
index 6b58bc7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Avg.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A {@link SampledStat} that maintains a simple average over its samples.
- */
-public class Avg extends SampledStat {
-
- public Avg() {
- super(0.0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- long count = 0;
- for (int i = 0; i < samples.size(); i++) {
- Sample s = samples.get(i);
- total += s.value;
- count += s.eventCount;
- }
- return total / count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Count.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Count.java
deleted file mode 100644
index b7fdb1e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Count.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A {@link SampledStat} that maintains a simple count of what it has seen.
- */
-public class Count extends SampledStat {
-
- public Count() {
- super(0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += 1.0;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Histogram.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Histogram.java
deleted file mode 100644
index 9289e89..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Histogram.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class Histogram {
-
- private final BinScheme binScheme;
- private final float[] hist;
- private double count;
-
- public Histogram(BinScheme binScheme) {
- this.hist = new float[binScheme.bins()];
- this.count = 0.0f;
- this.binScheme = binScheme;
- }
-
- public void record(double value) {
- this.hist[binScheme.toBin(value)] += 1.0f;
- this.count += 1.0f;
- }
-
- public double value(double quantile) {
- if (count == 0.0d)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int i = 0; i < this.hist.length - 1; i++) {
- sum += this.hist[i];
- if (sum / count > quant)
- return binScheme.fromBin(i);
- }
- return Float.POSITIVE_INFINITY;
- }
-
- public float[] counts() {
- return this.hist;
- }
-
- public void clear() {
- for (int i = 0; i < this.hist.length; i++)
- this.hist[i] = 0.0f;
- this.count = 0;
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder("{");
- for (int i = 0; i < this.hist.length - 1; i++) {
- b.append(String.format("%.10f", binScheme.fromBin(i)));
- b.append(':');
- b.append(String.format("%.0f", this.hist[i]));
- b.append(',');
- }
- b.append(Float.POSITIVE_INFINITY);
- b.append(':');
- b.append(this.hist[this.hist.length - 1]);
- b.append('}');
- return b.toString();
- }
-
- public interface BinScheme {
- public int bins();
-
- public int toBin(double value);
-
- public double fromBin(int bin);
- }
-
- public static class ConstantBinScheme implements BinScheme {
- private final double min;
- private final double max;
- private final int bins;
- private final double bucketWidth;
-
- public ConstantBinScheme(int bins, double min, double max) {
- if (bins < 2)
- throw new IllegalArgumentException("Must have at least 2 bins.");
- this.min = min;
- this.max = max;
- this.bins = bins;
- this.bucketWidth = (max - min) / (bins - 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == 0)
- return Double.NEGATIVE_INFINITY;
- else if (b == bins - 1)
- return Double.POSITIVE_INFINITY;
- else
- return min + (b - 1) * bucketWidth;
- }
-
- public int toBin(double x) {
- if (x < min)
- return 0;
- else if (x > max)
- return bins - 1;
- else
- return (int) ((x - min) / bucketWidth) + 1;
- }
- }
-
- public static class LinearBinScheme implements BinScheme {
- private final int bins;
- private final double max;
- private final double scale;
-
- public LinearBinScheme(int numBins, double max) {
- this.bins = numBins;
- this.max = max;
- this.scale = max / (numBins * (numBins - 1) / 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == this.bins - 1) {
- return Float.POSITIVE_INFINITY;
- } else {
- double unscaled = (b * (b + 1.0)) / 2.0;
- return unscaled * this.scale;
- }
- }
-
- public int toBin(double x) {
- if (x < 0.0d) {
- throw new IllegalArgumentException("Values less than 0.0 not accepted.");
- } else if (x > this.max) {
- return this.bins - 1;
- } else {
- double scaled = x / this.scale;
- return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Max.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Max.java
deleted file mode 100644
index 82390eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Max.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A {@link SampledStat} that gives the max over its samples.
- */
-public final class Max extends SampledStat {
-
- public Max() {
- super(Double.NEGATIVE_INFINITY);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.max(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.NEGATIVE_INFINITY;
- for (int i = 0; i < samples.size(); i++)
- max = Math.max(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Min.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Min.java
deleted file mode 100644
index 88fbe79..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Min.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A {@link SampledStat} that gives the min over its samples.
- */
-public class Min extends SampledStat {
-
- public Min() {
- super(Double.MIN_VALUE);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.min(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.MAX_VALUE;
- for (int i = 0; i < samples.size(); i++)
- max = Math.min(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentile.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentile.java
deleted file mode 100644
index 9a20965..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentile.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.MetricName;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class Percentile {
-
- private final MetricName name;
- private final double percentile;
-
- public Percentile(MetricName name, double percentile) {
- super();
- this.name = name;
- this.percentile = percentile;
- }
-
- public MetricName name() {
- return this.name;
- }
-
- public double percentile() {
- return this.percentile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentiles.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentiles.java
deleted file mode 100644
index aa8c8df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Percentiles.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.CompoundStat;
-import org.apache.flink.kafka_backport.common.metrics.Measurable;
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.metrics.stats.Histogram.BinScheme;
-import org.apache.flink.kafka_backport.common.metrics.stats.Histogram.ConstantBinScheme;
-import org.apache.flink.kafka_backport.common.metrics.stats.Histogram.LinearBinScheme;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A compound stat that reports one or more percentiles
- */
-public class Percentiles extends SampledStat implements CompoundStat {
-
- public static enum BucketSizing {
- CONSTANT, LINEAR
- }
-
- private final int buckets;
- private final Percentile[] percentiles;
- private final BinScheme binScheme;
-
- public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
- this(sizeInBytes, 0.0, max, bucketing, percentiles);
- }
-
- public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
- super(0.0);
- this.percentiles = percentiles;
- this.buckets = sizeInBytes / 4;
- if (bucketing == BucketSizing.CONSTANT) {
- this.binScheme = new ConstantBinScheme(buckets, min, max);
- } else if (bucketing == BucketSizing.LINEAR) {
- if (min != 0.0d)
- throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
- this.binScheme = new LinearBinScheme(buckets, max);
- } else {
- throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
- }
- }
-
- @Override
- public List<NamedMeasurable> stats() {
- List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
- for (Percentile percentile : this.percentiles) {
- final double pct = percentile.percentile();
- ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return value(config, now, pct / 100.0);
- }
- }));
- }
- return ms;
- }
-
- public double value(MetricConfig config, long now, double quantile) {
- purgeObsoleteSamples(config, now);
- float count = 0.0f;
- for (Sample sample : this.samples)
- count += sample.eventCount;
- if (count == 0.0f)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int b = 0; b < buckets; b++) {
- for (int s = 0; s < this.samples.size(); s++) {
- HistogramSample sample = (HistogramSample) this.samples.get(s);
- float[] hist = sample.histogram.counts();
- sum += hist[b];
- if (sum / count > quant)
- return binScheme.fromBin(b);
- }
- }
- return Double.POSITIVE_INFINITY;
- }
-
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- return value(config, now, 0.5);
- }
-
- @Override
- protected HistogramSample newSample(long timeMs) {
- return new HistogramSample(this.binScheme, timeMs);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
- HistogramSample hist = (HistogramSample) sample;
- hist.histogram.record(value);
- }
-
- private static class HistogramSample extends SampledStat.Sample {
- private final Histogram histogram;
-
- private HistogramSample(BinScheme scheme, long now) {
- super(0.0, now);
- this.histogram = new Histogram(scheme);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Rate.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Rate.java
deleted file mode 100644
index 0eaa167..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Rate.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.metrics.MeasurableStat;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
- * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
- * values.
- */
-public class Rate implements MeasurableStat {
-
- private final TimeUnit unit;
- private final SampledStat stat;
-
- public Rate() {
- this(TimeUnit.SECONDS);
- }
-
- public Rate(TimeUnit unit) {
- this(unit, new SampledTotal());
- }
-
- public Rate(SampledStat stat) {
- this(TimeUnit.SECONDS, stat);
- }
-
- public Rate(TimeUnit unit, SampledStat stat) {
- this.stat = stat;
- this.unit = unit;
- }
-
- public String unitName() {
- return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
- }
-
- @Override
- public void record(MetricConfig config, double value, long timeMs) {
- this.stat.record(config, value, timeMs);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- double value = stat.measure(config, now);
- double elapsed = convert(now - stat.oldest(now).lastWindowMs);
- return value / elapsed;
- }
-
- private double convert(long time) {
- switch (unit) {
- case NANOSECONDS:
- return time * 1000.0 * 1000.0;
- case MICROSECONDS:
- return time * 1000.0;
- case MILLISECONDS:
- return time;
- case SECONDS:
- return time / 1000.0;
- case MINUTES:
- return time / (60.0 * 1000.0);
- case HOURS:
- return time / (60.0 * 60.0 * 1000.0);
- case DAYS:
- return time / (24.0 * 60.0 * 60.0 * 1000.0);
- default:
- throw new IllegalStateException("Unknown unit: " + unit);
- }
- }
-
- public static class SampledTotal extends SampledStat {
-
- public SampledTotal() {
- super(0.0d);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/SampledStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/SampledStat.java
deleted file mode 100644
index 57548eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/SampledStat.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MeasurableStat;
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
- * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
- * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
- * <p>
- * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
- * recycled to begin recording the next sample.
- *
- * Subclasses of this class define different statistics measured using this basic pattern.
- */
-public abstract class SampledStat implements MeasurableStat {
-
- private double initialValue;
- private int current = 0;
- protected List<Sample> samples;
-
- public SampledStat(double initialValue) {
- this.initialValue = initialValue;
- this.samples = new ArrayList<Sample>(2);
- }
-
- @Override
- public void record(MetricConfig config, double value, long timeMs) {
- Sample sample = current(timeMs);
- if (sample.isComplete(timeMs, config))
- sample = advance(config, timeMs);
- update(sample, config, value, timeMs);
- sample.eventCount += 1;
- }
-
- private Sample advance(MetricConfig config, long timeMs) {
- this.current = (this.current + 1) % config.samples();
- if (this.current >= samples.size()) {
- Sample sample = newSample(timeMs);
- this.samples.add(sample);
- return sample;
- } else {
- Sample sample = current(timeMs);
- sample.reset(timeMs);
- return sample;
- }
- }
-
- protected Sample newSample(long timeMs) {
- return new Sample(this.initialValue, timeMs);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- purgeObsoleteSamples(config, now);
- return combine(this.samples, config, now);
- }
-
- public Sample current(long timeMs) {
- if (samples.size() == 0)
- this.samples.add(newSample(timeMs));
- return this.samples.get(this.current);
- }
-
- public Sample oldest(long now) {
- if (samples.size() == 0)
- this.samples.add(newSample(now));
- Sample oldest = this.samples.get(0);
- for (int i = 1; i < this.samples.size(); i++) {
- Sample curr = this.samples.get(i);
- if (curr.lastWindowMs < oldest.lastWindowMs)
- oldest = curr;
- }
- return oldest;
- }
-
- protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
-
- public abstract double combine(List<Sample> samples, MetricConfig config, long now);
-
- /* Timeout any windows that have expired in the absence of any events */
- protected void purgeObsoleteSamples(MetricConfig config, long now) {
- long expireAge = config.samples() * config.timeWindowMs();
- for (int i = 0; i < samples.size(); i++) {
- Sample sample = this.samples.get(i);
- if (now - sample.lastWindowMs >= expireAge)
- sample.reset(now);
- }
- }
-
- protected static class Sample {
- public double initialValue;
- public long eventCount;
- public long lastWindowMs;
- public double value;
-
- public Sample(double initialValue, long now) {
- this.initialValue = initialValue;
- this.eventCount = 0;
- this.lastWindowMs = now;
- this.value = initialValue;
- }
-
- public void reset(long now) {
- this.eventCount = 0;
- this.lastWindowMs = now;
- this.value = initialValue;
- }
-
- public boolean isComplete(long timeMs, MetricConfig config) {
- return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Total.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Total.java
deleted file mode 100644
index 9f338d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/stats/Total.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics.stats;
-
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.metrics.MeasurableStat;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An un-windowed cumulative total maintained over all time.
- */
-public class Total implements MeasurableStat {
-
- private double total;
-
- public Total() {
- this.total = 0.0;
- }
-
- public Total(double value) {
- this.total = value;
- }
-
- @Override
- public void record(MetricConfig config, double value, long now) {
- this.total += value;
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- return this.total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferReceive.java
deleted file mode 100644
index 47b5d65..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
- private final String source;
- private final ByteBuffer[] buffers;
- private int remaining;
-
- public ByteBufferReceive(String source, ByteBuffer... buffers) {
- super();
- this.source = source;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- }
-
- @Override
- public String source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return remaining > 0;
- }
-
- @Override
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- long read = channel.read(buffers);
- remaining += read;
- return read;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferSend.java
deleted file mode 100644
index 10cb50f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/ByteBufferSend.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A send backed by an array of byte buffers
- */
-public class ByteBufferSend implements Send {
-
- private final String destination;
- protected final ByteBuffer[] buffers;
- private int remaining;
- private int size;
-
- public ByteBufferSend(String destination, ByteBuffer... buffers) {
- super();
- this.destination = destination;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- this.size = remaining;
- }
-
- @Override
- public String destination() {
- return destination;
- }
-
- @Override
- public boolean completed() {
- return remaining <= 0;
- }
-
- @Override
- public long size() {
- return this.size;
- }
-
- @Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
- long written = channel.write(buffers);
- if (written < 0)
- throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
- remaining -= written;
- return written;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/InvalidReceiveException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/InvalidReceiveException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/InvalidReceiveException.java
deleted file mode 100644
index 2b2553f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/InvalidReceiveException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class InvalidReceiveException extends KafkaException {
-
- public InvalidReceiveException(String message) {
- super(message);
- }
-
- public InvalidReceiveException(String message, Throwable cause) {
- super(message, cause);
- }
-}