You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:23 UTC

[06/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
deleted file mode 100644
index 709a868..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Metrics.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.utils.CopyOnWriteMap;
-import org.apache.kafka.copied.common.utils.SystemTime;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A registry of sensors and metrics.
- * <p>
- * A metric is a named, numerical measurement. A sensor is a handle to record numerical measurements as they occur. Each
- * Sensor has zero or more associated metrics. For example a Sensor might represent message sizes and we might associate
- * with this sensor a metric for the average, maximum, or other statistics computed off the sequence of message sizes
- * that are recorded by the sensor.
- * <p>
- * Usage looks something like this:
- * 
- * <pre>
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
- * MetricName metricName = new MetricName(&quot;message-size-avg&quot;, &quot;producer-metrics&quot;);
- * sensor.add(metricName, new Avg());
- * metricName = new MetricName(&quot;message-size-max&quot;, &quot;producer-metrics&quot;);
- * sensor.add(metricName, new Max());
- * 
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * </pre>
- */
-public class Metrics implements Closeable {
-
-    private final MetricConfig config;
-    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
-    private final ConcurrentMap<String, Sensor> sensors;
-    private final List<MetricsReporter> reporters;
-    private final Time time;
-
-    /**
-     * Create a metrics repository with no metric reporters and default configuration.
-     */
-    public Metrics() {
-        this(new MetricConfig());
-    }
-
-    /**
-     * Create a metrics repository with no metric reporters and default configuration.
-     */
-    public Metrics(Time time) {
-        this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
-    }
-
-    /**
-     * Create a metrics repository with no reporters and the given default config. This config will be used for any
-     * metric that doesn't override its own config.
-     * @param defaultConfig The default config to use for all metrics that don't override their config
-     */
-    public Metrics(MetricConfig defaultConfig) {
-        this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
-    }
-
-    /**
-     * Create a metrics repository with a default config and the given metric reporters
-     * @param defaultConfig The default config
-     * @param reporters The metrics reporters
-     * @param time The time instance to use with the metrics
-     */
-    public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
-        this.config = defaultConfig;
-        this.sensors = new CopyOnWriteMap<String, Sensor>();
-        this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
-        this.reporters = Utils.notNull(reporters);
-        this.time = time;
-        for (MetricsReporter reporter : reporters)
-            reporter.init(new ArrayList<KafkaMetric>());
-    }
-
-    /**
-     * Get the sensor with the given name if it exists
-     * @param name The name of the sensor
-     * @return Return the sensor or null if no such sensor exists
-     */
-    public Sensor getSensor(String name) {
-        return this.sensors.get(Utils.notNull(name));
-    }
-
-    /**
-     * Get or create a sensor with the given unique name and no parent sensors.
-     * @param name The sensor name
-     * @return The sensor
-     */
-    public Sensor sensor(String name) {
-        return sensor(name, null, (Sensor[]) null);
-    }
-
-    /**
-     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
-     * receive every value recorded with this sensor.
-     * @param name The name of the sensor
-     * @param parents The parent sensors
-     * @return The sensor that is created
-     */
-    public Sensor sensor(String name, Sensor... parents) {
-        return sensor(name, null, parents);
-    }
-
-    /**
-     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
-     * receive every value recorded with this sensor.
-     * @param name The name of the sensor
-     * @param config A default configuration to use for this sensor for metrics that don't have their own config
-     * @param parents The parent sensors
-     * @return The sensor that is created
-     */
-    public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
-        Sensor s = getSensor(name);
-        if (s == null) {
-            s = new Sensor(this, name, parents, config == null ? this.config : config, time);
-            this.sensors.put(name, s);
-        }
-        return s;
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param metricName The name of the metric
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public void addMetric(MetricName metricName, Measurable measurable) {
-        addMetric(metricName, null, measurable);
-    }
-
-    /**
-     * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
-     * This is a way to expose existing values as metrics.
-     * @param metricName The name of the metric
-     * @param config The configuration to use when measuring this measurable
-     * @param measurable The measurable that will be measured by this metric
-     */
-    public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
-        KafkaMetric m = new KafkaMetric(new Object(),
-                                        Utils.notNull(metricName),
-                                        Utils.notNull(measurable),
-                                        config == null ? this.config : config,
-                                        time);
-        registerMetric(m);
-    }
-
-    /**
-     * Add a MetricReporter
-     */
-    public synchronized void addReporter(MetricsReporter reporter) {
-        Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values()));
-        this.reporters.add(reporter);
-    }
-
-    synchronized void registerMetric(KafkaMetric metric) {
-        MetricName metricName = metric.metricName();
-        if (this.metrics.containsKey(metricName))
-            throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
-        this.metrics.put(metricName, metric);
-        for (MetricsReporter reporter : reporters)
-            reporter.metricChange(metric);
-    }
-
-    /**
-     * Get all the metrics currently maintained indexed by metricName
-     */
-    public Map<MetricName, KafkaMetric> metrics() {
-        return this.metrics;
-    }
-
-    /**
-     * Close this metrics repository.
-     */
-    @Override
-    public void close() {
-        for (MetricsReporter reporter : this.reporters)
-            reporter.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
deleted file mode 100644
index 4f5b00d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.Configurable;
-
-import java.util.List;
-
-/**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported.
- */
-public interface MetricsReporter extends Configurable {
-
-    /**
-     * This is called when the reporter is first registered to initially register all existing metrics
-     * @param metrics All currently existing metrics
-     */
-    public void init(List<KafkaMetric> metrics);
-
-    /**
-     * This is called whenever a metric is updated or added
-     * @param metric
-     */
-    public void metricChange(KafkaMetric metric);
-
-    /**
-     * Called when the metrics repository is closed.
-     */
-    public void close();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
deleted file mode 100644
index f9893a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Quota.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-/**
- * An upper or lower bound for metrics
- */
-public final class Quota {
-
-    private final boolean upper;
-    private final double bound;
-
-    public Quota(double bound, boolean upper) {
-        this.bound = bound;
-        this.upper = upper;
-    }
-
-    public static Quota lessThan(double upperBound) {
-        return new Quota(upperBound, true);
-    }
-
-    public static Quota moreThan(double lowerBound) {
-        return new Quota(lowerBound, false);
-    }
-
-    public boolean isUpperBound() {
-        return this.upper;
-    }
-
-    public double bound() {
-        return this.bound;
-    }
-
-    public boolean acceptable(double value) {
-        return (upper && value <= bound) || (!upper && value >= bound);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
deleted file mode 100644
index add99b9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/QuotaViolationException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.KafkaException;
-
-/**
- * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
- */
-public class QuotaViolationException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public QuotaViolationException(String m) {
-        super(m);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
deleted file mode 100644
index e4df999..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Sensor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
- * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
- * of metrics about request sizes such as the average or max.
- */
-public final class Sensor {
-
-    private final Metrics registry;
-    private final String name;
-    private final Sensor[] parents;
-    private final List<Stat> stats;
-    private final List<KafkaMetric> metrics;
-    private final MetricConfig config;
-    private final Time time;
-
-    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
-        super();
-        this.registry = registry;
-        this.name = Utils.notNull(name);
-        this.parents = parents == null ? new Sensor[0] : parents;
-        this.metrics = new ArrayList<KafkaMetric>();
-        this.stats = new ArrayList<Stat>();
-        this.config = config;
-        this.time = time;
-        checkForest(new HashSet<Sensor>());
-    }
-
-    /* Validate that this sensor doesn't end up referencing itself */
-    private void checkForest(Set<Sensor> sensors) {
-        if (!sensors.add(this))
-            throw new IllegalArgumentException("Circular dependency in sensors: " + name() + " is its own parent.");
-        for (int i = 0; i < parents.length; i++)
-            parents[i].checkForest(sensors);
-    }
-
-    /**
-     * The name this sensor is registered with. This name will be unique among all registered sensors.
-     */
-    public String name() {
-        return this.name;
-    }
-
-    /**
-     * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
-     */
-    public void record() {
-        record(1.0);
-    }
-
-    /**
-     * Record a value with this sensor
-     * @param value The value to record
-     * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
-     *         bound
-     */
-    public void record(double value) {
-        record(value, time.milliseconds());
-    }
-
-    /**
-     * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
-     * the time stamp.
-     * @param value The value we are recording
-     * @param timeMs The current POSIX time in milliseconds
-     * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
-     *         bound
-     */
-    public void record(double value, long timeMs) {
-        synchronized (this) {
-            // increment all the stats
-            for (int i = 0; i < this.stats.size(); i++)
-                this.stats.get(i).record(config, value, timeMs);
-            checkQuotas(timeMs);
-        }
-        for (int i = 0; i < parents.length; i++)
-            parents[i].record(value, timeMs);
-    }
-
-    /**
-     * Check if we have violated our quota for any metric that has a configured quota
-     * @param timeMs
-     */
-    private void checkQuotas(long timeMs) {
-        for (int i = 0; i < this.metrics.size(); i++) {
-            KafkaMetric metric = this.metrics.get(i);
-            MetricConfig config = metric.config();
-            if (config != null) {
-                Quota quota = config.quota();
-                if (quota != null) {
-                    if (!quota.acceptable(metric.value(timeMs)))
-                        throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
-                }
-            }
-        }
-    }
-
-    /**
-     * Register a compound statistic with this sensor with no config override
-     */
-    public void add(CompoundStat stat) {
-        add(stat, null);
-    }
-
-    /**
-     * Register a compound statistic with this sensor which yields multiple measurable quantities (like a histogram)
-     * @param stat The stat to register
-     * @param config The configuration for this stat. If null then the stat will use the default configuration for this
-     *        sensor.
-     */
-    public synchronized void add(CompoundStat stat, MetricConfig config) {
-        this.stats.add(Utils.notNull(stat));
-        for (CompoundStat.NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
-            this.registry.registerMetric(metric);
-            this.metrics.add(metric);
-        }
-    }
-
-    /**
-     * Register a metric with this sensor
-     * @param metricName The name of the metric
-     * @param stat The statistic to keep
-     */
-    public void add(MetricName metricName, MeasurableStat stat) {
-        add(metricName, stat, null);
-    }
-
-    /**
-     * Register a metric with this sensor
-     * @param metricName The name of the metric
-     * @param stat The statistic to keep
-     * @param config A special configuration for this metric. If null use the sensor default configuration.
-     */
-    public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
-        KafkaMetric metric = new KafkaMetric(new Object(),
-                                             Utils.notNull(metricName),
-                                             Utils.notNull(stat),
-                                             config == null ? this.config : config,
-                                             time);
-        this.registry.registerMetric(metric);
-        this.metrics.add(metric);
-        this.stats.add(stat);
-    }
-
-    synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
deleted file mode 100644
index 67ee79b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/Stat.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics;
-
-/**
- * A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
- */
-public interface Stat {
-
-    /**
-     * Record the given value
-     * @param config The configuration to use for this metric
-     * @param value The value to record
-     * @param timeMs The POSIX time in milliseconds this value occurred
-     */
-    public void record(MetricConfig config, double value, long timeMs);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
deleted file mode 100644
index b76f6fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Avg.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that maintains a simple average over its samples.
- */
-public class Avg extends SampledStat {
-
-    public Avg() {
-        super(0.0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += value;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        long count = 0;
-        for (int i = 0; i < samples.size(); i++) {
-            Sample s = samples.get(i);
-            total += s.value;
-            count += s.eventCount;
-        }
-        return total / count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
deleted file mode 100644
index 4fd1c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Count.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that maintains a simple count of what it has seen.
- */
-public class Count extends SampledStat {
-
-    public Count() {
-        super(0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += 1.0;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        for (int i = 0; i < samples.size(); i++)
-            total += samples.get(i).value;
-        return total;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
deleted file mode 100644
index 13255c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Histogram.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-public class Histogram {
-
-    private final BinScheme binScheme;
-    private final float[] hist;
-    private double count;
-
-    public Histogram(BinScheme binScheme) {
-        this.hist = new float[binScheme.bins()];
-        this.count = 0.0f;
-        this.binScheme = binScheme;
-    }
-
-    public void record(double value) {
-        this.hist[binScheme.toBin(value)] += 1.0f;
-        this.count += 1.0f;
-    }
-
-    public double value(double quantile) {
-        if (count == 0.0d)
-            return Double.NaN;
-        float sum = 0.0f;
-        float quant = (float) quantile;
-        for (int i = 0; i < this.hist.length - 1; i++) {
-            sum += this.hist[i];
-            if (sum / count > quant)
-                return binScheme.fromBin(i);
-        }
-        return Float.POSITIVE_INFINITY;
-    }
-
-    public float[] counts() {
-        return this.hist;
-    }
-
-    public void clear() {
-        for (int i = 0; i < this.hist.length; i++)
-            this.hist[i] = 0.0f;
-        this.count = 0;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder b = new StringBuilder("{");
-        for (int i = 0; i < this.hist.length - 1; i++) {
-            b.append(String.format("%.10f", binScheme.fromBin(i)));
-            b.append(':');
-            b.append(String.format("%.0f", this.hist[i]));
-            b.append(',');
-        }
-        b.append(Float.POSITIVE_INFINITY);
-        b.append(':');
-        b.append(this.hist[this.hist.length - 1]);
-        b.append('}');
-        return b.toString();
-    }
-
-    public interface BinScheme {
-        public int bins();
-
-        public int toBin(double value);
-
-        public double fromBin(int bin);
-    }
-
-    public static class ConstantBinScheme implements BinScheme {
-        private final double min;
-        private final double max;
-        private final int bins;
-        private final double bucketWidth;
-
-        public ConstantBinScheme(int bins, double min, double max) {
-            if (bins < 2)
-                throw new IllegalArgumentException("Must have at least 2 bins.");
-            this.min = min;
-            this.max = max;
-            this.bins = bins;
-            this.bucketWidth = (max - min) / (bins - 2);
-        }
-
-        public int bins() {
-            return this.bins;
-        }
-
-        public double fromBin(int b) {
-            if (b == 0)
-                return Double.NEGATIVE_INFINITY;
-            else if (b == bins - 1)
-                return Double.POSITIVE_INFINITY;
-            else
-                return min + (b - 1) * bucketWidth;
-        }
-
-        public int toBin(double x) {
-            if (x < min)
-                return 0;
-            else if (x > max)
-                return bins - 1;
-            else
-                return (int) ((x - min) / bucketWidth) + 1;
-        }
-    }
-
-    public static class LinearBinScheme implements BinScheme {
-        private final int bins;
-        private final double max;
-        private final double scale;
-
-        public LinearBinScheme(int numBins, double max) {
-            this.bins = numBins;
-            this.max = max;
-            this.scale = max / (numBins * (numBins - 1) / 2);
-        }
-
-        public int bins() {
-            return this.bins;
-        }
-
-        public double fromBin(int b) {
-            if (b == this.bins - 1) {
-                return Float.POSITIVE_INFINITY;
-            } else {
-                double unscaled = (b * (b + 1.0)) / 2.0;
-                return unscaled * this.scale;
-            }
-        }
-
-        public int toBin(double x) {
-            if (x < 0.0d) {
-                throw new IllegalArgumentException("Values less than 0.0 not accepted.");
-            } else if (x > this.max) {
-                return this.bins - 1;
-            } else {
-                double scaled = x / this.scale;
-                return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
deleted file mode 100644
index 8b1d8d0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Max.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that gives the max over its samples.
- */
-public final class Max extends SampledStat {
-
-    public Max() {
-        super(Double.NEGATIVE_INFINITY);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value = Math.max(sample.value, value);
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double max = Double.NEGATIVE_INFINITY;
-        for (int i = 0; i < samples.size(); i++)
-            max = Math.max(max, samples.get(i).value);
-        return max;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
deleted file mode 100644
index b4af5f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Min.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-
-/**
- * A {@link SampledStat} that gives the min over its samples.
- */
-public class Min extends SampledStat {
-
-    public Min() {
-        super(Double.MIN_VALUE);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value = Math.min(sample.value, value);
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double max = Double.MAX_VALUE;
-        for (int i = 0; i < samples.size(); i++)
-            max = Math.min(max, samples.get(i).value);
-        return max;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
deleted file mode 100644
index dac44eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentile.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.MetricName;
-
-public class Percentile {
-
-    private final MetricName name;
-    private final double percentile;
-
-    public Percentile(MetricName name, double percentile) {
-        super();
-        this.name = name;
-        this.percentile = percentile;
-    }
-
-    public MetricName name() {
-        return this.name;
-    }
-
-    public double percentile() {
-        return this.percentile;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
deleted file mode 100644
index ed94418..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Percentiles.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.CompoundStat;
-import org.apache.kafka.copied.common.metrics.Measurable;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.BinScheme;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.ConstantBinScheme;
-import org.apache.kafka.copied.common.metrics.stats.Histogram.LinearBinScheme;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A compound stat that reports one or more percentiles
- */
-public class Percentiles extends SampledStat implements CompoundStat {
-
-    public static enum BucketSizing {
-        CONSTANT, LINEAR
-    }
-
-    private final int buckets;
-    private final Percentile[] percentiles;
-    private final BinScheme binScheme;
-
-    public Percentiles(int sizeInBytes, double max, BucketSizing bucketing, Percentile... percentiles) {
-        this(sizeInBytes, 0.0, max, bucketing, percentiles);
-    }
-
-    public Percentiles(int sizeInBytes, double min, double max, BucketSizing bucketing, Percentile... percentiles) {
-        super(0.0);
-        this.percentiles = percentiles;
-        this.buckets = sizeInBytes / 4;
-        if (bucketing == BucketSizing.CONSTANT) {
-            this.binScheme = new ConstantBinScheme(buckets, min, max);
-        } else if (bucketing == BucketSizing.LINEAR) {
-            if (min != 0.0d)
-                throw new IllegalArgumentException("Linear bucket sizing requires min to be 0.0.");
-            this.binScheme = new LinearBinScheme(buckets, max);
-        } else {
-            throw new IllegalArgumentException("Unknown bucket type: " + bucketing);
-        }
-    }
-
-    @Override
-    public List<NamedMeasurable> stats() {
-        List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
-        for (Percentile percentile : this.percentiles) {
-            final double pct = percentile.percentile();
-            ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return value(config, now, pct / 100.0);
-                }
-            }));
-        }
-        return ms;
-    }
-
-    public double value(MetricConfig config, long now, double quantile) {
-        purgeObsoleteSamples(config, now);
-        float count = 0.0f;
-        for (Sample sample : this.samples)
-            count += sample.eventCount;
-        if (count == 0.0f)
-            return Double.NaN;
-        float sum = 0.0f;
-        float quant = (float) quantile;
-        for (int b = 0; b < buckets; b++) {
-            for (int s = 0; s < this.samples.size(); s++) {
-                HistogramSample sample = (HistogramSample) this.samples.get(s);
-                float[] hist = sample.histogram.counts();
-                sum += hist[b];
-                if (sum / count > quant)
-                    return binScheme.fromBin(b);
-            }
-        }
-        return Double.POSITIVE_INFINITY;
-    }
-
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        return value(config, now, 0.5);
-    }
-
-    @Override
-    protected HistogramSample newSample(long timeMs) {
-        return new HistogramSample(this.binScheme, timeMs);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
-        HistogramSample hist = (HistogramSample) sample;
-        hist.histogram.record(value);
-    }
-
-    private static class HistogramSample extends SampledStat.Sample {
-        private final Histogram histogram;
-
-        private HistogramSample(BinScheme scheme, long now) {
-            super(0.0, now);
-            this.histogram = new Histogram(scheme);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
deleted file mode 100644
index 2eb6d64..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Rate.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
- * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
- * values.
- */
-public class Rate implements MeasurableStat {
-
-    private final TimeUnit unit;
-    private final SampledStat stat;
-
-    public Rate() {
-        this(TimeUnit.SECONDS);
-    }
-
-    public Rate(TimeUnit unit) {
-        this(unit, new SampledTotal());
-    }
-
-    public Rate(SampledStat stat) {
-        this(TimeUnit.SECONDS, stat);
-    }
-
-    public Rate(TimeUnit unit, SampledStat stat) {
-        this.stat = stat;
-        this.unit = unit;
-    }
-
-    public String unitName() {
-        return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long timeMs) {
-        this.stat.record(config, value, timeMs);
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        double value = stat.measure(config, now);
-        double elapsed = convert(now - stat.oldest(now).lastWindowMs);
-        return value / elapsed;
-    }
-
-    private double convert(long time) {
-        switch (unit) {
-            case NANOSECONDS:
-                return time * 1000.0 * 1000.0;
-            case MICROSECONDS:
-                return time * 1000.0;
-            case MILLISECONDS:
-                return time;
-            case SECONDS:
-                return time / 1000.0;
-            case MINUTES:
-                return time / (60.0 * 1000.0);
-            case HOURS:
-                return time / (60.0 * 60.0 * 1000.0);
-            case DAYS:
-                return time / (24.0 * 60.0 * 60.0 * 1000.0);
-            default:
-                throw new IllegalStateException("Unknown unit: " + unit);
-        }
-    }
-
-    public static class SampledTotal extends SampledStat {
-
-        public SampledTotal() {
-            super(0.0d);
-        }
-
-        @Override
-        protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
-            sample.value += value;
-        }
-
-        @Override
-        public double combine(List<Sample> samples, MetricConfig config, long now) {
-            double total = 0.0;
-            for (int i = 0; i < samples.size(); i++)
-                total += samples.get(i).value;
-            return total;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
deleted file mode 100644
index 6d53a89..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/SampledStat.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
- * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
- * window is complete when <i>either</i> the event count or ellapsed time criterion is met).
- * <p>
- * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
- * recycled to begin recording the next sample.
- * 
- * Subclasses of this class define different statistics measured using this basic pattern.
- */
-public abstract class SampledStat implements MeasurableStat {
-
-    private double initialValue;
-    private int current = 0;
-    protected List<Sample> samples;
-
-    public SampledStat(double initialValue) {
-        this.initialValue = initialValue;
-        this.samples = new ArrayList<Sample>(2);
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long timeMs) {
-        Sample sample = current(timeMs);
-        if (sample.isComplete(timeMs, config))
-            sample = advance(config, timeMs);
-        update(sample, config, value, timeMs);
-        sample.eventCount += 1;
-    }
-
-    private Sample advance(MetricConfig config, long timeMs) {
-        this.current = (this.current + 1) % config.samples();
-        if (this.current >= samples.size()) {
-            Sample sample = newSample(timeMs);
-            this.samples.add(sample);
-            return sample;
-        } else {
-            Sample sample = current(timeMs);
-            sample.reset(timeMs);
-            return sample;
-        }
-    }
-
-    protected Sample newSample(long timeMs) {
-        return new Sample(this.initialValue, timeMs);
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        purgeObsoleteSamples(config, now);
-        return combine(this.samples, config, now);
-    }
-
-    public Sample current(long timeMs) {
-        if (samples.size() == 0)
-            this.samples.add(newSample(timeMs));
-        return this.samples.get(this.current);
-    }
-
-    public Sample oldest(long now) {
-        if (samples.size() == 0)
-            this.samples.add(newSample(now));
-        Sample oldest = this.samples.get(0);
-        for (int i = 1; i < this.samples.size(); i++) {
-            Sample curr = this.samples.get(i);
-            if (curr.lastWindowMs < oldest.lastWindowMs)
-                oldest = curr;
-        }
-        return oldest;
-    }
-
-    protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
-
-    public abstract double combine(List<Sample> samples, MetricConfig config, long now);
-
-    /* Timeout any windows that have expired in the absence of any events */
-    protected void purgeObsoleteSamples(MetricConfig config, long now) {
-        long expireAge = config.samples() * config.timeWindowMs();
-        for (int i = 0; i < samples.size(); i++) {
-            Sample sample = this.samples.get(i);
-            if (now - sample.lastWindowMs >= expireAge)
-                sample.reset(now);
-        }
-    }
-
-    protected static class Sample {
-        public double initialValue;
-        public long eventCount;
-        public long lastWindowMs;
-        public double value;
-
-        public Sample(double initialValue, long now) {
-            this.initialValue = initialValue;
-            this.eventCount = 0;
-            this.lastWindowMs = now;
-            this.value = initialValue;
-        }
-
-        public void reset(long now) {
-            this.eventCount = 0;
-            this.lastWindowMs = now;
-            this.value = initialValue;
-        }
-
-        public boolean isComplete(long timeMs, MetricConfig config) {
-            return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
deleted file mode 100644
index 98909b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/metrics/stats/Total.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.metrics.stats;
-
-import org.apache.kafka.copied.common.metrics.MeasurableStat;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-
-/**
- * An un-windowed cumulative total maintained over all time.
- */
-public class Total implements MeasurableStat {
-
-    private double total;
-
-    public Total() {
-        this.total = 0.0;
-    }
-
-    public Total(double value) {
-        this.total = value;
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long now) {
-        this.total += value;
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        return this.total;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
deleted file mode 100644
index 6ae4dcd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferReceive.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A receive backed by an array of ByteBuffers
- */
-public class ByteBufferReceive implements Receive {
-
-    private final String source;
-    private final ByteBuffer[] buffers;
-    private int remaining;
-
-    public ByteBufferReceive(String source, ByteBuffer... buffers) {
-        super();
-        this.source = source;
-        this.buffers = buffers;
-        for (int i = 0; i < buffers.length; i++)
-            remaining += buffers[i].remaining();
-    }
-
-    @Override
-    public String source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return remaining > 0;
-    }
-
-    @Override
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        long read = channel.read(buffers);
-        remaining += read;
-        return read;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
deleted file mode 100644
index c573db5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/ByteBufferSend.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A send backed by an array of byte buffers
- */
-public class ByteBufferSend implements Send {
-
-    private final String destination;
-    protected final ByteBuffer[] buffers;
-    private int remaining;
-    private int size;
-
-    public ByteBufferSend(String destination, ByteBuffer... buffers) {
-        super();
-        this.destination = destination;
-        this.buffers = buffers;
-        for (int i = 0; i < buffers.length; i++)
-            remaining += buffers[i].remaining();
-        this.size = remaining;
-    }
-
-    @Override
-    public String destination() {
-        return destination;
-    }
-
-    @Override
-    public boolean completed() {
-        return remaining <= 0;
-    }
-
-    @Override
-    public long size() {
-        return this.size;
-    }
-
-    @Override
-    public long writeTo(GatheringByteChannel channel) throws IOException {
-        long written = channel.write(buffers);
-        if (written < 0)
-            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
-        remaining -= written;
-        return written;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
deleted file mode 100644
index 24dc983..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/InvalidReceiveException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import org.apache.kafka.copied.common.KafkaException;
-
-public class InvalidReceiveException extends KafkaException {
-
-    public InvalidReceiveException(String message) {
-        super(message);
-    }
-
-    public InvalidReceiveException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
deleted file mode 100644
index 38541e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/MultiSend.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import org.apache.kafka.copied.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A set of composite sends, sent one after another
- */
-
-public class MultiSend implements Send {
-
-    private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
-    private String dest;
-    private long totalWritten = 0;
-    private List<Send> sends;
-    private Iterator<Send> sendsIterator;
-    private Send current;
-    private boolean doneSends = false;
-    private long size = 0;
-
-    public MultiSend(String dest, List<Send> sends) {
-        this.dest = dest;
-        this.sends = sends;
-        this.sendsIterator = sends.iterator();
-        nextSendOrDone();
-        for (Send send: sends)
-            this.size += send.size();
-    }
-
-    @Override
-    public long size() {
-        return size;
-    }
-
-    @Override
-    public String destination() {
-        return dest;
-    }
-
-    @Override
-    public boolean completed() {
-        if (doneSends) {
-            if (totalWritten != size)
-                log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public long writeTo(GatheringByteChannel channel) throws IOException {
-        if (completed())
-            throw new KafkaException("This operation cannot be completed on a complete request.");
-
-        int totalWrittenPerCall = 0;
-        boolean sendComplete = false;
-        do {
-            long written = current.writeTo(channel);
-            totalWritten += written;
-            totalWrittenPerCall += written;
-            sendComplete = current.completed();
-            if (sendComplete)
-                nextSendOrDone();
-        } while (!completed() && sendComplete);
-        if (log.isTraceEnabled())
-            log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall +  "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size);
-        return totalWrittenPerCall;
-    }
-
-    // update current if there's a next Send, mark sends as done if there isn't
-    private void nextSendOrDone() {
-        if (sendsIterator.hasNext())
-            current = sendsIterator.next();
-        else
-            doneSends = true;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
deleted file mode 100644
index 6b065f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
-    public final static String UNKNOWN_SOURCE = "";
-    public final static int UNLIMITED = -1;
-
-    private final String source;
-    private final ByteBuffer size;
-    private final int maxSize;
-    private ByteBuffer buffer;
-
-
-    public NetworkReceive(String source, ByteBuffer buffer) {
-        this.source = source;
-        this.buffer = buffer;
-        this.size = null;
-        this.maxSize = UNLIMITED;
-    }
-
-    public NetworkReceive(String source) {
-        this.source = source;
-        this.size = ByteBuffer.allocate(4);
-        this.buffer = null;
-        this.maxSize = UNLIMITED;
-    }
-
-    public NetworkReceive(int maxSize, String source) {
-        this.source = source;
-        this.size = ByteBuffer.allocate(4);
-        this.buffer = null;
-        this.maxSize = maxSize;
-    }
-
-    public NetworkReceive() {
-        this(UNKNOWN_SOURCE);
-    }
-
-    @Override
-    public String source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return !size.hasRemaining() && !buffer.hasRemaining();
-    }
-
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        return readFromReadableChannel(channel);
-    }
-
-    // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
-    // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
-    // This can go away after we get rid of BlockingChannel
-    @Deprecated
-    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
-        int read = 0;
-        if (size.hasRemaining()) {
-            int bytesRead = channel.read(size);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-            if (!size.hasRemaining()) {
-                size.rewind();
-                int receiveSize = size.getInt();
-                if (receiveSize < 0)
-                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
-                if (maxSize != UNLIMITED && receiveSize > maxSize)
-                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
-                this.buffer = ByteBuffer.allocate(receiveSize);
-            }
-        }
-        if (buffer != null) {
-            int bytesRead = channel.read(buffer);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-        }
-
-        return read;
-    }
-
-    public ByteBuffer payload() {
-        return this.buffer;
-    }
-
-    // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
-    @Deprecated
-    public long readCompletely(ReadableByteChannel channel) throws IOException {
-        int totalRead = 0;
-        while (!complete()) {
-            totalRead += readFromReadableChannel(channel);
-        }
-        return totalRead;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
deleted file mode 100644
index 1c8438c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/NetworkSend.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.nio.ByteBuffer;
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
-    public NetworkSend(String destination, ByteBuffer... buffers) {
-        super(destination, sizeDelimit(buffers));
-    }
-
-    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
-        int size = 0;
-        for (int i = 0; i < buffers.length; i++)
-            size += buffers[i].remaining();
-        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
-        delimited[0] = ByteBuffer.allocate(4);
-        delimited[0].putInt(size);
-        delimited[0].rewind();
-        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
-        return delimited;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
deleted file mode 100644
index cb66b57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Receive.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.nio.channels.ScatteringByteChannel;
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
-    /**
-     * The numeric id of the source from which we are receiving data.
-     */
-    public String source();
-
-    /**
-     * Are we done receiving data?
-     */
-    public boolean complete();
-
-    /**
-     * Read bytes into this receive from the given channel
-     * @param channel The channel to read from
-     * @return The number of bytes read
-     * @throws IOException If the reading fails
-     */
-    public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
deleted file mode 100644
index b5bc46e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selectable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
-    /**
-     * Begin establishing a socket connection to the given address identified by the given address
-     * @param id The id for this connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the socket
-     * @param receiveBufferSize The receive buffer for the socket
-     * @throws IOException If we cannot begin connecting
-     */
-    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
-    /**
-     * Begin disconnecting the connection identified by the given id
-     */
-    public void disconnect(String id);
-
-    /**
-     * Wakeup this selector if it is blocked on I/O
-     */
-    public void wakeup();
-
-    /**
-     * Close this selector
-     */
-    public void close();
-
-    /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
-     * @param send The request to send
-     */
-    public void send(Send send);
-
-    /**
-     * Do I/O. Reads, writes, connection establishment, etc.
-     * @param timeout The amount of time to block if there is nothing to do
-     * @throws IOException
-     */
-    public void poll(long timeout) throws IOException;
-
-    /**
-     * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
-     */
-    public List<Send> completedSends();
-
-    /**
-     * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
-     */
-    public List<NetworkReceive> completedReceives();
-
-    /**
-     * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
-     * call.
-     */
-    public List<String> disconnected();
-
-    /**
-     * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
-     * call.
-     */
-    public List<String> connected();
-
-    /**
-     * Disable reads from the given connection
-     * @param id The id for the connection
-     */
-    public void mute(String id);
-
-    /**
-     * Re-enable reads from the given connection
-     * @param id The id for the connection
-     */
-    public void unmute(String id);
-
-    /**
-     * Disable reads from all connections
-     */
-    public void muteAll();
-
-    /**
-     * Re-enable reads from all connections
-     */
-    public void unmuteAll();
-
-}
\ No newline at end of file