You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:23 UTC
[06/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
deleted file mode 100644
index 709a868..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.utils.CopyOnWriteMap;
-import org.apache.kafka.copied.common.utils.SystemTime;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A registry of sensors and metrics.
- * <p>
- * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
- * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
- * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
- * that are recorded by the sensor.
- * <p>
- * Usage looks something like this:
- *
- * <pre>
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics");
- * sensor.add(metricName, new Avg());
- * metricName = new MetricName("message-size-max", "producer-metrics");
- * sensor.add(metricName, new Max());
- *
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * </pre>
- */
-public class Metrics implements Closeable {
-
- private final MetricConfig config;
- private final ConcurrentMap<MetricName, KafkaMetric> metrics;
- private final ConcurrentMap<String, Sensor> sensors;
- private final List<MetricsReporter> reporters;
- private final Time time;
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics() {
- this(new MetricConfig());
- }
-
- /**
- * Create a metrics repository with no metric reporters and default configuration.
- */
- public Metrics(Time time) {
- this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
- }
-
- /**
- * Create a metrics repository with no reporters and the given default config. This config will be used for any
- * metric that doesn't override its own config.
- * @param defaultConfig The default config to use for all metrics that don't override their config
- */
- public Metrics(MetricConfig defaultConfig) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
- }
-
- /**
- * Create a metrics repository with a default config and the given metric reporters
- * @param defaultConfig The default config
- * @param reporters The metrics reporters
- * @param time The time instance to use with the metrics
- */
- public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
- this.config = defaultConfig;
- this.sensors = new CopyOnWriteMap<String, Sensor>();
- this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
- this.reporters = Utils.notNull(reporters);
- this.time = time;
- for (MetricsReporter reporter : reporters)
- reporter.init(new ArrayList<KafkaMetric>());
- }
-
- /**
- * Get the sensor with the given name if it exists
- * @param name The name of the sensor
- * @return Return the sensor or null if no such sensor exists
- */
- public Sensor getSensor(String name) {
- return this.sensors.get(Utils.notNull(name));
- }
-
- /**
- * Get or create a sensor with the given unique name and no parent sensors.
- * @param name The sensor name
- * @return The sensor
- */
- public Sensor sensor(String name) {
- return sensor(name, null, (Sensor[]) null);
- }
-
- /**
- * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
- * receive every value recorded with this sensor.
- * @param name The name of the sensor
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public Sensor sensor(String name, Sensor... parents) {
- return sensor(name, null, parents);
- }
-
- /**
- * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
- * receive every value recorded with this sensor.
- * @param name The name of the sensor
- * @param config A default configuration to use for this sensor for metrics that don't have their own config
- * @param parents The parent sensors
- * @return The sensor that is created
- */
- public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
- Sensor s = getSensor(name);
- if (s == null) {
- s = new Sensor(this, name, parents, config == null ? this.config : config, time);
- this.sensors.put(name, s);
- }
- return s;
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param metricName The name of the metric
- * @param measurable The measurable that will be measured by this metric
- */
- public void addMetric(MetricName metricName, Measurable measurable) {
- addMetric(metricName, null, measurable);
- }
-
- /**
- * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
- * This is a way to expose existing values as metrics.
- * @param metricName The name of the metric
- * @param config The configuration to use when measuring this measurable
- * @param measurable The measurable that will be measured by this metric
- */
- public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
- KafkaMetric m = new KafkaMetric(new Object(),
- Utils.notNull(metricName),
- Utils.notNull(measurable),
- config == null ? this.config : config,
- time);
- registerMetric(m);
- }
-
- /**
- * Add a MetricReporter
- */
- public synchronized void addReporter(MetricsReporter reporter) {
- Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
- this.reporters.add(reporter);
- }
-
- synchronized void registerMetric(KafkaMetric metric) {
- MetricName metricName = metric.metricName();
- if (this.metrics.containsKey(metricName))
- throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
- this.metrics.put(metricName, metric);
- for (MetricsReporter reporter : reporters)
- reporter.metricChange(metric);
- }
-
- /**
- * Get all the metrics currently maintained indexed by metricName
- */
- public Map<MetricName, KafkaMetric> metrics() {
- return this.metrics;
- }
-
- /**
- * Close this metrics repository.
- */
- @Override
- public void close() {
- for (MetricsReporter reporter : this.reporters)
- reporter.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
deleted file mode 100644
index 4f5b00d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.Configurable;
-
-import java.util.List;
-
-/**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported.
- */
-public interface MetricsReporter extends Configurable {
-
- /**
- * This is called when the reporter is first registered to initially register all existing metrics
- * @param metrics All currently existing metrics
- */
- public void init(List<KafkaMetric> metrics);
-
- /**
- * This is called whenever a metric is updated or added
- * @param metric
- */
- public void metricChange(KafkaMetric metric);
-
- /**
- * Called when the metrics repository is closed.
- */
- public void close();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
deleted file mode 100644
index f9893a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-/**
- * An upper or lower bound for metrics
- */
-public final class Quota {
-
- private final boolean upper;
- private final double bound;
-
- public Quota(double bound, boolean upper) {
- this.bound = bound;
- this.upper = upper;
- }
-
- public static Quota lessThan(double upperBound) {
- return new Quota(upperBound, true);
- }
-
- public static Quota moreThan(double lowerBound) {
- return new Quota(lowerBound, false);
- }
-
- public boolean isUpperBound() {
- return this.upper;
- }
-
- public double bound() {
- return this.bound;
- }
-
- public boolean acceptable(double value) {
- return (upper && value <= bound) || (!upper && value >= bound);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
deleted file mode 100644
index add99b9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.KafkaException;
-
-/**
- * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
- */
-public class QuotaViolationException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public QuotaViolationException(String m) {
- super(m);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
deleted file mode 100644
index e4df999..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
- * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
- * of metrics about request sizes such as the average or max.
- */
-public final class Sensor {
-
- private final Metrics registry;
- private final String name;
- private final Sensor[] parents;
- private final List<Stat> stats;
- private final List<KafkaMetric> metrics;
- private final MetricConfig config;
- private final Time time;
-
- Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
- super();
- this.registry = registry;
- this.name = Utils.notNull(name);
- this.parents = parents == null ? new Sensor[0] : parents;
- this.metrics = new ArrayList<KafkaMetric>();
- this.stats = new ArrayList<Stat>();
- this.config = config;
- this.time = time;
- checkForest(new HashSet<Sensor>());
- }
-
- /* Validate that this sensor doesn't end up referencing itself */
- private void checkForest(Set<Sensor> sensors) {
- if (!sensors.add(this))
- throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
- for (int i = 0; i < parents.length; i++)
- parents[i].checkForest(sensors);
- }
-
- /**
- * The name this sensor is registered with. This name will be unique among all registered sensors.
- */
- public String name() {
- return this.name;
- }
-
- /**
- * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
- */
- public void record() {
- record(1.0);
- }
-
- /**
- * Record a value with this sensor
- * @param value The value to record
- * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
- * bound
- */
- public void record(double value) {
- record(value, time.milliseconds());
- }
-
- /**
- * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
- * the time stamp.
- * @param value The value we are recording
- * @param timeMs The current POSIX time in milliseconds
- * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
- * bound
- */
- public void record(double value, long timeMs) {
- synchronized (this) {
- // increment all the stats
- for (int i = 0; i < this.stats.size(); i++)
- this.stats.get(i).record(config, value, timeMs);
- checkQuotas(timeMs);
- }
- for (int i = 0; i < parents.length; i++)
- parents[i].record(value, timeMs);
- }
-
- /**
- * Check if we have violated our quota for any metric that has a configured quota
- * @param timeMs
- */
- private void checkQuotas(long timeMs) {
- for (int i = 0; i < this.metrics.size(); i++) {
- KafkaMetric metric = this.metrics.get(i);
- MetricConfig config = metric.config();
- if (config != null) {
- Quota quota = config.quota();
- if (quota != null) {
- if (!quota.acceptable(metric.value(timeMs)))
- throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
- }
- }
- }
- }
-
- /**
- * Register a compound statistic with this sensor with no config override
- */
- public void add(CompoundStat stat) {
- add(stat, null);
- }
-
- /**
- * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
- * @param stat The stat to register
- * @param config The configuration for this stat. If null then the stat will use the default configuration for this
- * sensor.
- */
- public synchronized void add(CompoundStat stat, MetricConfig config) {
- this.stats.add(Utils.notNull(stat));
- for (CompoundStat.NamedMeasurable m : stat.stats()) {
- KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- }
- }
-
- /**
- * Register a metric with this sensor
- * @param metricName The name of the metric
- * @param stat The statistic to keep
- */
- public void add(MetricName metricName, MeasurableStat stat) {
- add(metricName, stat, null);
- }
-
- /**
- * Register a metric with this sensor
- * @param metricName The name of the metric
- * @param stat The statistic to keep
- * @param config A special configuration for this metric. If null use the sensor default configuration.
- */
- public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
- KafkaMetric metric = new KafkaMetric(new Object(),
- Utils.notNull(metricName),
- Utils.notNull(stat),
- config == null ? this.config : config,
- time);
- this.registry.registerMetric(metric);
- this.metrics.add(metric);
- this.stats.add(stat);
- }
-
- synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(this.metrics);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
deleted file mode 100644
index 67ee79b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-/**
- * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
- */
-public interface Stat {
-
- /**
- * Record the given value
- * @param config The configuration to use for this metric
- * @param value The value to record
- * @param timeMs The POSIX time in milliseconds this value occurred
- */
- public void record(MetricConfig config, double value, long timeMs);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
deleted file mode 100644
index b76f6fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that maintains a simple average over its samples.
- */
-public class Avg extends SampledStat {
-
- public Avg() {
- super(0.0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- long count = 0;
- for (int i = 0; i < samples.size(); i++) {
- Sample s = samples.get(i);
- total += s.value;
- count += s.eventCount;
- }
- return total / count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
deleted file mode 100644
index 4fd1c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that maintains a simple count of what it has seen.
- */
-public class Count extends SampledStat {
-
- public Count() {
- super(0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value += 1.0;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
deleted file mode 100644
index 13255c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-public class Histogram {
-
- private final BinScheme binScheme;
- private final float[] hist;
- private double count;
-
- public Histogram(BinScheme binScheme) {
- this.hist = new float[binScheme.bins()];
- this.count = 0.0f;
- this.binScheme = binScheme;
- }
-
- public void record(double value) {
- this.hist[binScheme.toBin(value)] += 1.0f;
- this.count += 1.0f;
- }
-
- public double value(double quantile) {
- if (count == 0.0d)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int i = 0; i < this.hist.length - 1; i++) {
- sum += this.hist[i];
- if (sum / count > quant)
- return binScheme.fromBin(i);
- }
- return Float.POSITIVE_INFINITY;
- }
-
- public float[] counts() {
- return this.hist;
- }
-
- public void clear() {
- for (int i = 0; i < this.hist.length; i++)
- this.hist[i] = 0.0f;
- this.count = 0;
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder("{");
- for (int i = 0; i < this.hist.length - 1; i++) {
- b.append(String.format("%.10f", binScheme.fromBin(i)));
- b.append(':');
- b.append(String.format("%.0f", this.hist[i]));
- b.append(',');
- }
- b.append(Float.POSITIVE_INFINITY);
- b.append(':');
- b.append(this.hist[this.hist.length - 1]);
- b.append('}');
- return b.toString();
- }
-
- public interface BinScheme {
- public int bins();
-
- public int toBin(double value);
-
- public double fromBin(int bin);
- }
-
- public static class ConstantBinScheme implements BinScheme {
- private final double min;
- private final double max;
- private final int bins;
- private final double bucketWidth;
-
- public ConstantBinScheme(int bins, double min, double max) {
- if (bins < 2)
- throw new IllegalArgumentException("Must have at least 2 bins.");
- this.min = min;
- this.max = max;
- this.bins = bins;
- this.bucketWidth = (max - min) / (bins - 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == 0)
- return Double.NEGATIVE_INFINITY;
- else if (b == bins - 1)
- return Double.POSITIVE_INFINITY;
- else
- return min + (b - 1) * bucketWidth;
- }
-
- public int toBin(double x) {
- if (x < min)
- return 0;
- else if (x > max)
- return bins - 1;
- else
- return (int) ((x - min) / bucketWidth) + 1;
- }
- }
-
- public static class LinearBinScheme implements BinScheme {
- private final int bins;
- private final double max;
- private final double scale;
-
- public LinearBinScheme(int numBins, double max) {
- this.bins = numBins;
- this.max = max;
- this.scale = max / (numBins * (numBins - 1) / 2);
- }
-
- public int bins() {
- return this.bins;
- }
-
- public double fromBin(int b) {
- if (b == this.bins - 1) {
- return Float.POSITIVE_INFINITY;
- } else {
- double unscaled = (b * (b + 1.0)) / 2.0;
- return unscaled * this.scale;
- }
- }
-
- public int toBin(double x) {
- if (x < 0.0d) {
- throw new IllegalArgumentException("Values less than 0.0 not accepted.");
- } else if (x > this.max) {
- return this.bins - 1;
- } else {
- double scaled = x / this.scale;
- return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
deleted file mode 100644
index 8b1d8d0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that gives the max over its samples.
- */
-public final class Max extends SampledStat {
-
- public Max() {
- super(Double.NEGATIVE_INFINITY);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.max(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.NEGATIVE_INFINITY;
- for (int i = 0; i < samples.size(); i++)
- max = Math.max(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
deleted file mode 100644
index b4af5f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that gives the min over its samples.
- */
-public class Min extends SampledStat {
-
- public Min() {
- super(Double.MIN_VALUE);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long now) {
- sample.value = Math.min(sample.value, value);
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double max = Double.MAX_VALUE;
- for (int i = 0; i < samples.size(); i++)
- max = Math.min(max, samples.get(i).value);
- return max;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
deleted file mode 100644
index dac44eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.MetricName;
-
-public class Percentile {
-
- private final MetricName name;
- private final double percentile;
-
- public Percentile(MetricName name, double percentile) {
- super();
- this.name = name;
- this.percentile = percentile;
- }
-
- public MetricName name() {
- return this.name;
- }
-
- public double percentile() {
- return this.percentile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
deleted file mode 100644
index ed94418..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.CompoundStat;
-import org.apache.kafka.copied.common.metrics.Measurable;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.BinScheme;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.ConstantBinScheme;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.LinearBinScheme;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A compound stat that reports one or more percentiles
- */
-public class Percentiles extends SampledStat implements CompoundStat {
-
- public static enum BucketSizing {
- CONSTANT, LINEAR
- }
-
- private final int buckets;
- private final Percentile[] percentiles;
- private final BinScheme binScheme;
-
- public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
- this(sizeInBytes, 0.0, max, bucketing, percentiles);
- }
-
- public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
- super(0.0);
- this.percentiles = percentiles;
- this.buckets = sizeInBytes / 4;
- if (bucketing == BucketSizing.CONSTANT) {
- this.binScheme = new ConstantBinScheme(buckets, min, max);
- } else if (bucketing == BucketSizing.LINEAR) {
- if (min != 0.0d)
- throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
- this.binScheme = new LinearBinScheme(buckets, max);
- } else {
- throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
- }
- }
-
- @Override
- public List<NamedMeasurable> stats() {
- List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
- for (Percentile percentile : this.percentiles) {
- final double pct = percentile.percentile();
- ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return value(config, now, pct / 100.0);
- }
- }));
- }
- return ms;
- }
-
- public double value(MetricConfig config, long now, double quantile) {
- purgeObsoleteSamples(config, now);
- float count = 0.0f;
- for (Sample sample : this.samples)
- count += sample.eventCount;
- if (count == 0.0f)
- return Double.NaN;
- float sum = 0.0f;
- float quant = (float) quantile;
- for (int b = 0; b < buckets; b++) {
- for (int s = 0; s < this.samples.size(); s++) {
- HistogramSample sample = (HistogramSample) this.samples.get(s);
- float[] hist = sample.histogram.counts();
- sum += hist[b];
- if (sum / count > quant)
- return binScheme.fromBin(b);
- }
- }
- return Double.POSITIVE_INFINITY;
- }
-
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- return value(config, now, 0.5);
- }
-
- @Override
- protected HistogramSample newSample(long timeMs) {
- return new HistogramSample(this.binScheme, timeMs);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
- HistogramSample hist = (HistogramSample) sample;
- hist.histogram.record(value);
- }
-
- private static class HistogramSample extends SampledStat.Sample {
- private final Histogram histogram;
-
- private HistogramSample(BinScheme scheme, long now) {
- super(0.0, now);
- this.histogram = new Histogram(scheme);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
deleted file mode 100644
index 2eb6d64..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
- * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
- * values.
- */
-public class Rate implements MeasurableStat {
-
- private final TimeUnit unit;
- private final SampledStat stat;
-
- public Rate() {
- this(TimeUnit.SECONDS);
- }
-
- public Rate(TimeUnit unit) {
- this(unit, new SampledTotal());
- }
-
- public Rate(SampledStat stat) {
- this(TimeUnit.SECONDS, stat);
- }
-
- public Rate(TimeUnit unit, SampledStat stat) {
- this.stat = stat;
- this.unit = unit;
- }
-
- public String unitName() {
- return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
- }
-
- @Override
- public void record(MetricConfig config, double value, long timeMs) {
- this.stat.record(config, value, timeMs);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- double value = stat.measure(config, now);
- double elapsed = convert(now - stat.oldest(now).lastWindowMs);
- return value / elapsed;
- }
-
- private double convert(long time) {
- switch (unit) {
- case NANOSECONDS:
- return time * 1000.0 * 1000.0;
- case MICROSECONDS:
- return time * 1000.0;
- case MILLISECONDS:
- return time;
- case SECONDS:
- return time / 1000.0;
- case MINUTES:
- return time / (60.0 * 1000.0);
- case HOURS:
- return time / (60.0 * 60.0 * 1000.0);
- case DAYS:
- return time / (24.0 * 60.0 * 60.0 * 1000.0);
- default:
- throw new IllegalStateException("Unknown unit: " + unit);
- }
- }
-
- public static class SampledTotal extends SampledStat {
-
- public SampledTotal() {
- super(0.0d);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now) {
- double total = 0.0;
- for (int i = 0; i < samples.size(); i++)
- total += samples.get(i).value;
- return total;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
deleted file mode 100644
index 6d53a89..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
- * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
- * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
- * <p>
- * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
- * recycled to begin recording the next sample.
- *
- * Subclasses of this class define different statistics measured using this basic pattern.
- */
-public abstract class SampledStat implements MeasurableStat {
-
- private double initialValue;
- private int current = 0;
- protected List<Sample> samples;
-
- public SampledStat(double initialValue) {
- this.initialValue = initialValue;
- this.samples = new ArrayList<Sample>(2);
- }
-
- @Override
- public void record(MetricConfig config, double value, long timeMs) {
- Sample sample = current(timeMs);
- if (sample.isComplete(timeMs, config))
- sample = advance(config, timeMs);
- update(sample, config, value, timeMs);
- sample.eventCount += 1;
- }
-
- private Sample advance(MetricConfig config, long timeMs) {
- this.current = (this.current + 1) % config.samples();
- if (this.current >= samples.size()) {
- Sample sample = newSample(timeMs);
- this.samples.add(sample);
- return sample;
- } else {
- Sample sample = current(timeMs);
- sample.reset(timeMs);
- return sample;
- }
- }
-
- protected Sample newSample(long timeMs) {
- return new Sample(this.initialValue, timeMs);
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- purgeObsoleteSamples(config, now);
- return combine(this.samples, config, now);
- }
-
- public Sample current(long timeMs) {
- if (samples.size() == 0)
- this.samples.add(newSample(timeMs));
- return this.samples.get(this.current);
- }
-
- public Sample oldest(long now) {
- if (samples.size() == 0)
- this.samples.add(newSample(now));
- Sample oldest = this.samples.get(0);
- for (int i = 1; i < this.samples.size(); i++) {
- Sample curr = this.samples.get(i);
- if (curr.lastWindowMs < oldest.lastWindowMs)
- oldest = curr;
- }
- return oldest;
- }
-
- protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
-
- public abstract double combine(List<Sample> samples, MetricConfig config, long now);
-
- /* Timeout any windows that have expired in the absence of any events */
- protected void purgeObsoleteSamples(MetricConfig config, long now) {
- long expireAge = config.samples() * config.timeWindowMs();
- for (int i = 0; i < samples.size(); i++) {
- Sample sample = this.samples.get(i);
- if (now - sample.lastWindowMs >= expireAge)
- sample.reset(now);
- }
- }
-
- protected static class Sample {
- public double initialValue;
- public long eventCount;
- public long lastWindowMs;
- public double value;
-
- public Sample(double initialValue, long now) {
- this.initialValue = initialValue;
- this.eventCount = 0;
- this.lastWindowMs = now;
- this.value = initialValue;
- }
-
- public void reset(long now) {
- this.eventCount = 0;
- this.lastWindowMs = now;
- this.value = initialValue;
- }
-
- public boolean isComplete(long timeMs, MetricConfig config) {
- return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
deleted file mode 100644
index 98909b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-/**
- * An un-windowed cumulative total maintained over all time.
- */
-public class Total implements MeasurableStat {
-
- private double total;
-
- public Total() {
- this.total = 0.0;
- }
-
- public Total(double value) {
- this.total = value;
- }
-
- @Override
- public void record(MetricConfig config, double value, long now) {
- this.total += value;
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- return this.total;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
deleted file mode 100644
index 6ae4dcd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
- private final String source;
- private final ByteBuffer[] buffers;
- private int remaining;
-
- public ByteBufferReceive(String source, ByteBuffer... buffers) {
- super();
- this.source = source;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- }
-
- @Override
- public String source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return remaining > 0;
- }
-
- @Override
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- long read = channel.read(buffers);
- remaining += read;
- return read;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
deleted file mode 100644
index c573db5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A send backed by an array of byte buffers
- */
-public class ByteBufferSend implements Send {
-
- private final String destination;
- protected final ByteBuffer[] buffers;
- private int remaining;
- private int size;
-
- public ByteBufferSend(String destination, ByteBuffer... buffers) {
- super();
- this.destination = destination;
- this.buffers = buffers;
- for (int i = 0; i < buffers.length; i++)
- remaining += buffers[i].remaining();
- this.size = remaining;
- }
-
- @Override
- public String destination() {
- return destination;
- }
-
- @Override
- public boolean completed() {
- return remaining <= 0;
- }
-
- @Override
- public long size() {
- return this.size;
- }
-
- @Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
- long written = channel.write(buffers);
- if (written < 0)
- throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
- remaining -= written;
- return written;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
deleted file mode 100644
index 24dc983..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import org.apache.kafka.copied.common.KafkaException;
-
-public class InvalidReceiveException extends KafkaException {
-
- public InvalidReceiveException(String message) {
- super(message);
- }
-
- public InvalidReceiveException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
deleted file mode 100644
index 38541e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import org.apache.kafka.copied.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A set of composite sends, sent one after another
- */
-
-public class MultiSend implements Send {
-
- private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
- private String dest;
- private long totalWritten = 0;
- private List<Send> sends;
- private Iterator<Send> sendsIterator;
- private Send current;
- private boolean doneSends = false;
- private long size = 0;
-
- public MultiSend(String dest, List<Send> sends) {
- this.dest = dest;
- this.sends = sends;
- this.sendsIterator = sends.iterator();
- nextSendOrDone();
- for (Send send: sends)
- this.size += send.size();
- }
-
- @Override
- public long size() {
- return size;
- }
-
- @Override
- public String destination() {
- return dest;
- }
-
- @Override
- public boolean completed() {
- if (doneSends) {
- if (totalWritten != size)
- log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
- if (completed())
- throw new KafkaException("This operation cannot be completed on a complete request.");
-
- int totalWrittenPerCall = 0;
- boolean sendComplete = false;
- do {
- long written = current.writeTo(channel);
- totalWritten += written;
- totalWrittenPerCall += written;
- sendComplete = current.completed();
- if (sendComplete)
- nextSendOrDone();
- } while (!completed() && sendComplete);
- if (log.isTraceEnabled())
- log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size);
- return totalWrittenPerCall;
- }
-
- // update current if there's a next Send, mark sends as done if there isn't
- private void nextSendOrDone() {
- if (sendsIterator.hasNext())
- current = sendsIterator.next();
- else
- doneSends = true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
deleted file mode 100644
index 6b065f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
- public final static String UNKNOWN_SOURCE = "";
- public final static int UNLIMITED = -1;
-
- private final String source;
- private final ByteBuffer size;
- private final int maxSize;
- private ByteBuffer buffer;
-
-
- public NetworkReceive(String source, ByteBuffer buffer) {
- this.source = source;
- this.buffer = buffer;
- this.size = null;
- this.maxSize = UNLIMITED;
- }
-
- public NetworkReceive(String source) {
- this.source = source;
- this.size = ByteBuffer.allocate(4);
- this.buffer = null;
- this.maxSize = UNLIMITED;
- }
-
- public NetworkReceive(int maxSize, String source) {
- this.source = source;
- this.size = ByteBuffer.allocate(4);
- this.buffer = null;
- this.maxSize = maxSize;
- }
-
- public NetworkReceive() {
- this(UNKNOWN_SOURCE);
- }
-
- @Override
- public String source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return !size.hasRemaining() && !buffer.hasRemaining();
- }
-
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- return readFromReadableChannel(channel);
- }
-
- // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
- // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
- // This can go away after we get rid of BlockingChannel
- @Deprecated
- public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
- int read = 0;
- if (size.hasRemaining()) {
- int bytesRead = channel.read(size);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- if (!size.hasRemaining()) {
- size.rewind();
- int receiveSize = size.getInt();
- if (receiveSize < 0)
- throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
- if (maxSize != UNLIMITED && receiveSize > maxSize)
- throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
- this.buffer = ByteBuffer.allocate(receiveSize);
- }
- }
- if (buffer != null) {
- int bytesRead = channel.read(buffer);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- }
-
- return read;
- }
-
- public ByteBuffer payload() {
- return this.buffer;
- }
-
- // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
- @Deprecated
- public long readCompletely(ReadableByteChannel channel) throws IOException {
- int totalRead = 0;
- while (!complete()) {
- totalRead += readFromReadableChannel(channel);
- }
- return totalRead;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
deleted file mode 100644
index 1c8438c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.nio.ByteBuffer;
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
- public NetworkSend(String destination, ByteBuffer... buffers) {
- super(destination, sizeDelimit(buffers));
- }
-
- private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
- int size = 0;
- for (int i = 0; i < buffers.length; i++)
- size += buffers[i].remaining();
- ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
- delimited[0] = ByteBuffer.allocate(4);
- delimited[0].putInt(size);
- delimited[0].rewind();
- System.arraycopy(buffers, 0, delimited, 1, buffers.length);
- return delimited;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
deleted file mode 100644
index cb66b57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
- /**
- * The numeric id of the source from which we are receiving data.
- */
- public String source();
-
- /**
- * Are we done receiving data?
- */
- public boolean complete();
-
- /**
- * Read bytes into this receive from the given channel
- * @param channel The channel to read from
- * @return The number of bytes read
- * @throws IOException If the reading fails
- */
- public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
deleted file mode 100644
index b5bc46e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
- /**
- * Begin establishing a socket connection to the given address identified by the given address
- * @param id The id for this connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the socket
- * @param receiveBufferSize The receive buffer for the socket
- * @throws IOException If we cannot begin connecting
- */
- public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
- /**
- * Begin disconnecting the connection identified by the given id
- */
- public void disconnect(String id);
-
- /**
- * Wakeup this selector if it is blocked on I/O
- */
- public void wakeup();
-
- /**
- * Close this selector
- */
- public void close();
-
- /**
- * Queue the given request for sending in the subsequent {@poll(long)} calls
- * @param send The request to send
- */
- public void send(Send send);
-
- /**
- * Do I/O. Reads, writes, connection establishment, etc.
- * @param timeout The amount of time to block if there is nothing to do
- * @throws IOException
- */
- public void poll(long timeout) throws IOException;
-
- /**
- * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
- */
- public List<Send> completedSends();
-
- /**
- * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
- */
- public List<NetworkReceive> completedReceives();
-
- /**
- * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
- * call.
- */
- public List<String> disconnected();
-
- /**
- * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
- * call.
- */
- public List<String> connected();
-
- /**
- * Disable reads from the given connection
- * @param id The id for the connection
- */
- public void mute(String id);
-
- /**
- * Re-enable reads from the given connection
- * @param id The id for the connection
- */
- public void unmute(String id);
-
- /**
- * Disable reads from all connections
- */
- public void muteAll();
-
- /**
- * Re-enable reads from all connections
- */
- public void unmuteAll();
-
-}
\ No newline at end of file