You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:00 UTC

[43/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
deleted file mode 100644
index 7a90171..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A set of composite sends, sent one after another
- */
-public class MultiSend implements Send {
-
-    private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
-    private String dest;
-    private long totalWritten = 0;
-    private List<Send> sends;
-    private Iterator<Send> sendsIterator;
-    private Send current;
-    private boolean doneSends = false;
-    private long size = 0;
-
-    public MultiSend(String dest, List<Send> sends) {
-        this.dest = dest;
-        this.sends = sends;
-        this.sendsIterator = sends.iterator();
-        nextSendOrDone();
-        for (Send send: sends)
-            this.size += send.size();
-    }
-
-    @Override
-    public long size() {
-        return size;
-    }
-
-    @Override
-    public String destination() {
-        return dest;
-    }
-
-    @Override
-    public boolean completed() {
-        if (doneSends) {
-            if (totalWritten != size)
-                log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public long writeTo(GatheringByteChannel channel) throws IOException {
-        if (completed())
-            throw new KafkaException("This operation cannot be completed on a complete request.");
-
-        int totalWrittenPerCall = 0;
-        boolean sendComplete = false;
-        do {
-            long written = current.writeTo(channel);
-            totalWritten += written;
-            totalWrittenPerCall += written;
-            sendComplete = current.completed();
-            if (sendComplete)
-                nextSendOrDone();
-        } while (!completed() && sendComplete);
-        if (log.isTraceEnabled())
-            log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall +  "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size);
-        return totalWrittenPerCall;
-    }
-
-    // update current if there's a next Send, mark sends as done if there isn't
-    private void nextSendOrDone() {
-        if (sendsIterator.hasNext())
-            current = sendsIterator.next();
-        else
-            doneSends = true;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
deleted file mode 100644
index c0d5d99..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
-    public final static String UNKNOWN_SOURCE = "";
-    public final static int UNLIMITED = -1;
-
-    private final String source;
-    private final ByteBuffer size;
-    private final int maxSize;
-    private ByteBuffer buffer;
-
-
-    public NetworkReceive(String source, ByteBuffer buffer) {
-        this.source = source;
-        this.buffer = buffer;
-        this.size = null;
-        this.maxSize = UNLIMITED;
-    }
-
-    public NetworkReceive(String source) {
-        this.source = source;
-        this.size = ByteBuffer.allocate(4);
-        this.buffer = null;
-        this.maxSize = UNLIMITED;
-    }
-
-    public NetworkReceive(int maxSize, String source) {
-        this.source = source;
-        this.size = ByteBuffer.allocate(4);
-        this.buffer = null;
-        this.maxSize = maxSize;
-    }
-
-    public NetworkReceive() {
-        this(UNKNOWN_SOURCE);
-    }
-
-    @Override
-    public String source() {
-        return source;
-    }
-
-    @Override
-    public boolean complete() {
-        return !size.hasRemaining() && !buffer.hasRemaining();
-    }
-
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
-        return readFromReadableChannel(channel);
-    }
-
-    // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
-    // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
-    // This can go away after we get rid of BlockingChannel
-    @Deprecated
-    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
-        int read = 0;
-        if (size.hasRemaining()) {
-            int bytesRead = channel.read(size);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-            if (!size.hasRemaining()) {
-                size.rewind();
-                int receiveSize = size.getInt();
-                if (receiveSize < 0)
-                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
-                if (maxSize != UNLIMITED && receiveSize > maxSize)
-                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
-                this.buffer = ByteBuffer.allocate(receiveSize);
-            }
-        }
-        if (buffer != null) {
-            int bytesRead = channel.read(buffer);
-            if (bytesRead < 0)
-                throw new EOFException();
-            read += bytesRead;
-        }
-
-        return read;
-    }
-
-    public ByteBuffer payload() {
-        return this.buffer;
-    }
-
-    // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
-    @Deprecated
-    public long readCompletely(ReadableByteChannel channel) throws IOException {
-        int totalRead = 0;
-        while (!complete()) {
-            totalRead += readFromReadableChannel(channel);
-        }
-        return totalRead;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
deleted file mode 100644
index 29ce09d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
-    public NetworkSend(String destination, ByteBuffer... buffers) {
-        super(destination, sizeDelimit(buffers));
-    }
-
-    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
-        int size = 0;
-        for (int i = 0; i < buffers.length; i++)
-            size += buffers[i].remaining();
-        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
-        delimited[0] = ByteBuffer.allocate(4);
-        delimited[0].putInt(size);
-        delimited[0].rewind();
-        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
-        return delimited;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
deleted file mode 100644
index b799e7c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.nio.channels.ScatteringByteChannel;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
-    /**
-     * The numeric id of the source from which we are receiving data.
-     */
-    public String source();
-
-    /**
-     * Are we done receiving data?
-     */
-    public boolean complete();
-
-    /**
-     * Read bytes into this receive from the given channel
-     * @param channel The channel to read from
-     * @return The number of bytes read
-     * @throws IOException If the reading fails
-     */
-    public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
deleted file mode 100644
index 08da141..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
-    /**
-     * Begin establishing a socket connection to the given address identified by the given address
-     * @param id The id for this connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the socket
-     * @param receiveBufferSize The receive buffer for the socket
-     * @throws IOException If we cannot begin connecting
-     */
-    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
-    /**
-     * Begin disconnecting the connection identified by the given id
-     */
-    public void disconnect(String id);
-
-    /**
-     * Wakeup this selector if it is blocked on I/O
-     */
-    public void wakeup();
-
-    /**
-     * Close this selector
-     */
-    public void close();
-
-    /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
-     * @param send The request to send
-     */
-    public void send(Send send);
-
-    /**
-     * Do I/O. Reads, writes, connection establishment, etc.
-     * @param timeout The amount of time to block if there is nothing to do
-     * @throws IOException
-     */
-    public void poll(long timeout) throws IOException;
-
-    /**
-     * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
-     */
-    public List<Send> completedSends();
-
-    /**
-     * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
-     */
-    public List<NetworkReceive> completedReceives();
-
-    /**
-     * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
-     * call.
-     */
-    public List<String> disconnected();
-
-    /**
-     * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
-     * call.
-     */
-    public List<String> connected();
-
-    /**
-     * Disable reads from the given connection
-     * @param id The id for the connection
-     */
-    public void mute(String id);
-
-    /**
-     * Re-enable reads from the given connection
-     * @param id The id for the connection
-     */
-    public void unmute(String id);
-
-    /**
-     * Disable reads from all connections
-     */
-    public void muteAll();
-
-    /**
-     * Re-enable reads from all connections
-     */
-    public void unmuteAll();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
deleted file mode 100644
index a886e3b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.metrics.Measurable;
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.metrics.Metrics;
-import org.apache.flink.kafka_backport.common.metrics.Sensor;
-import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
-import org.apache.flink.kafka_backport.common.metrics.stats.Count;
-import org.apache.flink.kafka_backport.common.metrics.stats.Max;
-import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
-import org.apache.flink.kafka_backport.common.utils.SystemTime;
-import org.apache.flink.kafka_backport.common.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A nioSelector interface for doing non-blocking multi-connection network I/O.
- * <p>
- * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
- * responses.
- * <p>
- * A connection can be added to the nioSelector associated with an integer id by doing
- * 
- * <pre>
- * nioSelector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
- * </pre>
- * 
- * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
- * the connection. The successful invocation of this method does not mean a valid connection has been established.
- * 
- * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
- * connections are all done using the <code>poll()</code> call.
- * 
- * <pre>
- * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * nioSelector.poll(TIMEOUT_MS, requestsToSend);
- * </pre>
- * 
- * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
- * various getters. These are reset by each call to <code>poll()</code>.
- * 
- * This class is not thread safe!
- */
-public class Selector implements Selectable {
-
-    private static final Logger log = LoggerFactory.getLogger(Selector.class);
-
-    private final java.nio.channels.Selector nioSelector;
-    private final Map<String, SelectionKey> keys;
-    private final List<Send> completedSends;
-    private final List<NetworkReceive> completedReceives;
-    private final List<String> disconnected;
-    private final List<String> connected;
-    private final List<String> failedSends;
-    private final Time time;
-    private final SelectorMetrics sensors;
-    private final String metricGrpPrefix;
-    private final Map<String, String> metricTags;
-    private final Map<String, Long> lruConnections;
-    private final long connectionsMaxIdleNanos;
-    private final int maxReceiveSize;
-    private final boolean metricsPerConnection;
-    private long currentTimeNanos;
-    private long nextIdleCloseCheckTime;
-
-
-    /**
-     * Create a new nioSelector
-     */
-    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
-        try {
-            this.nioSelector = java.nio.channels.Selector.open();
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-        this.maxReceiveSize = maxReceiveSize;
-        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
-        this.time = time;
-        this.metricGrpPrefix = metricGrpPrefix;
-        this.metricTags = metricTags;
-        this.keys = new HashMap<String, SelectionKey>();
-        this.completedSends = new ArrayList<Send>();
-        this.completedReceives = new ArrayList<NetworkReceive>();
-        this.connected = new ArrayList<String>();
-        this.disconnected = new ArrayList<String>();
-        this.failedSends = new ArrayList<String>();
-        this.sensors = new SelectorMetrics(metrics);
-        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
-        this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
-        currentTimeNanos = new SystemTime().nanoseconds();
-        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
-        this.metricsPerConnection = metricsPerConnection;
-    }
-
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
-    }
-
-    /**
-     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
-     * number.
-     * <p>
-     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
-     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
-     * @param id The id for the new connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the new connection
-     * @param receiveBufferSize The receive buffer for the new connection
-     * @throws IllegalStateException if there is already a connection for that id
-     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
-     */
-    @Override
-    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
-        if (this.keys.containsKey(id))
-            throw new IllegalStateException("There is already a connection for id " + id);
-
-        SocketChannel channel = SocketChannel.open();
-        channel.configureBlocking(false);
-        Socket socket = channel.socket();
-        socket.setKeepAlive(true);
-        socket.setSendBufferSize(sendBufferSize);
-        socket.setReceiveBufferSize(receiveBufferSize);
-        socket.setTcpNoDelay(true);
-        try {
-            channel.connect(address);
-        } catch (UnresolvedAddressException e) {
-            channel.close();
-            throw new IOException("Can't resolve address: " + address, e);
-        } catch (IOException e) {
-            channel.close();
-            throw e;
-        }
-        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
-    }
-
-    /**
-     * Register the nioSelector with an existing channel
-     * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
-     * Note that we are not checking if the connection id is valid - since the connection already exists
-     */
-    public void register(String id, SocketChannel channel) throws ClosedChannelException {
-        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
-    }
-
-    /**
-     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
-     * processed until the next {@link #poll(long, List) poll()} call.
-     */
-    @Override
-    public void disconnect(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key != null)
-            key.cancel();
-    }
-
-    /**
-     * Interrupt the nioSelector if it is blocked waiting to do I/O.
-     */
-    @Override
-    public void wakeup() {
-        this.nioSelector.wakeup();
-    }
-
-    /**
-     * Close this selector and all associated connections
-     */
-    @Override
-    public void close() {
-        List<String> connections = new LinkedList<String>(keys.keySet());
-        for (String id: connections)
-            close(id);
-
-        try {
-            this.nioSelector.close();
-        } catch (IOException e) {
-            log.error("Exception closing nioSelector:", e);
-        }
-    }
-
-    /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
-     * @param send The request to send
-     */
-    public void send(Send send) {
-        SelectionKey key = keyForId(send.destination());
-        Transmissions transmissions = transmissions(key);
-        if (transmissions.hasSend())
-            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
-        transmissions.send = send;
-        try {
-            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-        } catch (CancelledKeyException e) {
-            close(transmissions.id);
-            this.failedSends.add(send.destination());
-        }
-    }
-
-    /**
-     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
-     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
-     * 
-     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
-     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
-     * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
-     * completed I/O.
-     * 
-     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
-     *         already an in-progress send
-     */
-    @Override
-    public void poll(long timeout) throws IOException {
-        clear();
-
-        /* check ready keys */
-        long startSelect = time.nanoseconds();
-        int readyKeys = select(timeout);
-        long endSelect = time.nanoseconds();
-        currentTimeNanos = endSelect;
-        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
-
-        if (readyKeys > 0) {
-            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
-            Iterator<SelectionKey> iter = keys.iterator();
-            while (iter.hasNext()) {
-                SelectionKey key = iter.next();
-                iter.remove();
-
-                Transmissions transmissions = transmissions(key);
-                SocketChannel channel = channel(key);
-
-                // register all per-connection metrics at once
-                sensors.maybeRegisterConnectionMetrics(transmissions.id);
-                lruConnections.put(transmissions.id, currentTimeNanos);
-
-                try {
-                    /* complete any connections that have finished their handshake */
-                    if (key.isConnectable()) {
-                        channel.finishConnect();
-                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-                        this.connected.add(transmissions.id);
-                        this.sensors.connectionCreated.record();
-                        log.debug("Connection {} created", transmissions.id);
-                    }
-
-                    /* read from any connections that have readable data */
-                    if (key.isReadable()) {
-                        if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
-                        try {
-                            transmissions.receive.readFrom(channel);
-                        } catch (InvalidReceiveException e) {
-                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
-                            close(transmissions.id);
-                            this.disconnected.add(transmissions.id);
-                            throw e;
-                        }
-                        if (transmissions.receive.complete()) {
-                            transmissions.receive.payload().rewind();
-                            this.completedReceives.add(transmissions.receive);
-                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
-                            transmissions.clearReceive();
-                        }
-                    }
-
-                    /* write to any sockets that have space in their buffer and for which we have data */
-                    if (key.isWritable()) {
-                        transmissions.send.writeTo(channel);
-                        if (transmissions.send.completed()) {
-                            this.completedSends.add(transmissions.send);
-                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
-                            transmissions.clearSend();
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                        }
-                    }
-
-                    /* cancel any defunct sockets */
-                    if (!key.isValid()) {
-                        close(transmissions.id);
-                        this.disconnected.add(transmissions.id);
-                    }
-                } catch (IOException e) {
-                    String desc = socketDescription(channel);
-                    if (e instanceof EOFException || e instanceof ConnectException)
-                        log.debug("Connection {} disconnected", desc);
-                    else
-                        log.warn("Error in I/O with connection to {}", desc, e);
-                    close(transmissions.id);
-                    this.disconnected.add(transmissions.id);
-                }
-            }
-        }
-        long endIo = time.nanoseconds();
-        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
-        maybeCloseOldestConnection();
-    }
-
-    private String socketDescription(SocketChannel channel) {
-        Socket socket = channel.socket();
-        if (socket == null)
-            return "[unconnected socket]";
-        else if (socket.getInetAddress() != null)
-            return socket.getInetAddress().toString();
-        else
-            return socket.getLocalAddress().toString();
-    }
-
-    @Override
-    public List<Send> completedSends() {
-        return this.completedSends;
-    }
-
-    @Override
-    public List<NetworkReceive> completedReceives() {
-        return this.completedReceives;
-    }
-
-    @Override
-    public List<String> disconnected() {
-        return this.disconnected;
-    }
-
-    @Override
-    public List<String> connected() {
-        return this.connected;
-    }
-
-    @Override
-    public void mute(String id) {
-        mute(this.keyForId(id));
-    }
-
-    private void mute(SelectionKey key) {
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-    }
-
-    @Override
-    public void unmute(String id) {
-        unmute(this.keyForId(id));
-    }
-
-    private void unmute(SelectionKey key) {
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-    }
-
-    @Override
-    public void muteAll() {
-        for (SelectionKey key : this.keys.values())
-            mute(key);
-    }
-
-    @Override
-    public void unmuteAll() {
-        for (SelectionKey key : this.keys.values())
-            unmute(key);
-    }
-
-    private void maybeCloseOldestConnection() {
-        if (currentTimeNanos > nextIdleCloseCheckTime) {
-            if (lruConnections.isEmpty()) {
-                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
-            } else {
-                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
-                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
-                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
-                if (currentTimeNanos > nextIdleCloseCheckTime) {
-                    String connectionId = oldestConnectionEntry.getKey();
-                    if (log.isTraceEnabled())
-                        log.trace("About to close the idle connection from " + connectionId
-                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
-
-                    disconnected.add(connectionId);
-                    close(connectionId);
-                }
-            }
-        }
-    }
-
-    /**
-     * Clear the results from the prior poll
-     */
-    private void clear() {
-        this.completedSends.clear();
-        this.completedReceives.clear();
-        this.connected.clear();
-        this.disconnected.clear();
-        this.disconnected.addAll(this.failedSends);
-        this.failedSends.clear();
-    }
-
-    /**
-     * Check for data, waiting up to the given timeout.
-     * 
-     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @return The number of keys ready
-     * @throws IOException
-     */
-    private int select(long ms) throws IOException {
-        if (ms == 0L)
-            return this.nioSelector.selectNow();
-        else if (ms < 0L)
-            return this.nioSelector.select();
-        else
-            return this.nioSelector.select(ms);
-    }
-
-    /**
-     * Begin closing this connection
-     */
-    public void close(String id) {
-        SelectionKey key = keyForId(id);
-        lruConnections.remove(id);
-        SocketChannel channel = channel(key);
-        Transmissions trans = transmissions(key);
-        if (trans != null) {
-            this.keys.remove(trans.id);
-            trans.clearReceive();
-            trans.clearSend();
-        }
-        key.attach(null);
-        key.cancel();
-        try {
-            channel.socket().close();
-            channel.close();
-        } catch (IOException e) {
-            log.error("Exception closing connection to node {}:", trans.id, e);
-        }
-        this.sensors.connectionClosed.record();
-    }
-
-    /**
-     * Get the selection key associated with this numeric id
-     */
-    private SelectionKey keyForId(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
-        return key;
-    }
-
-    /**
-     * Get the transmissions for the given connection
-     */
-    private Transmissions transmissions(SelectionKey key) {
-        return (Transmissions) key.attachment();
-    }
-
-    /**
-     * Get the socket channel associated with this selection key
-     */
-    private SocketChannel channel(SelectionKey key) {
-        return (SocketChannel) key.channel();
-    }
-
-    /**
-     * The id and in-progress send and receive associated with a connection
-     */
-    private static class Transmissions {
-        public String id;
-        public Send send;
-        public NetworkReceive receive;
-
-        public Transmissions(String id) {
-            this.id = id;
-        }
-
-        public boolean hasSend() {
-            return this.send != null;
-        }
-
-        public void clearSend() {
-            this.send = null;
-        }
-
-        public boolean hasReceive() {
-            return this.receive != null;
-        }
-
-        public void clearReceive() {
-            this.receive = null;
-        }
-    }
-
-    private class SelectorMetrics {
-        private final Metrics metrics;
-        public final Sensor connectionClosed;
-        public final Sensor connectionCreated;
-        public final Sensor bytesTransferred;
-        public final Sensor bytesSent;
-        public final Sensor bytesReceived;
-        public final Sensor selectTime;
-        public final Sensor ioTime;
-
-        public SelectorMetrics(Metrics metrics) {
-            this.metrics = metrics;
-            String metricGrpName = metricGrpPrefix + "-metrics";
-            StringBuilder tagsSuffix = new StringBuilder();
-
-            for (Map.Entry<String, String> tag: metricTags.entrySet()) {
-                tagsSuffix.append(tag.getKey());
-                tagsSuffix.append("-");
-                tagsSuffix.append(tag.getValue());
-            }
-
-            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
-            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
-            this.connectionClosed.add(metricName, new Rate());
-
-            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
-            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
-            this.connectionCreated.add(metricName, new Rate());
-
-            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
-            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
-            bytesTransferred.add(metricName, new Rate(new Count()));
-
-            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
-            this.bytesSent.add(metricName, new Rate());
-            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
-            this.bytesSent.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
-            this.bytesSent.add(metricName, new Avg());
-            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
-            this.bytesSent.add(metricName, new Max());
-
-            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
-            this.bytesReceived.add(metricName, new Rate());
-            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
-            this.bytesReceived.add(metricName, new Rate(new Count()));
-
-            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
-            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
-            this.selectTime.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
-            this.selectTime.add(metricName, new Avg());
-            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
-            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
-            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
-            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
-            this.ioTime.add(metricName, new Avg());
-            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
-            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
-            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
-            this.metrics.addMetric(metricName, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return keys.size();
-                }
-            });
-        }
-
-        public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the connection,
-                // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    String metricGrpName = metricGrpPrefix + "-node-metrics";
-
-                    Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = this.metrics.sensor(nodeRequestName);
-                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
-                    nodeRequest.add(metricName, new Rate());
-                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
-                    nodeRequest.add(metricName, new Rate(new Count()));
-                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String nodeResponseName = "node-" + connectionId + ".bytes-received";
-                    Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
-                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
-                    nodeResponse.add(metricName, new Rate());
-                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
-                    nodeResponse.add(metricName, new Rate(new Count()));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
-                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
-            }
-        }
-
-        public void recordBytesSent(String connectionId, long bytes) {
-            long now = time.milliseconds();
-            this.bytesSent.record(bytes, now);
-            if (!connectionId.isEmpty()) {
-                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null)
-                    nodeRequest.record(bytes, now);
-            }
-        }
-
-        public void recordBytesReceived(String connection, int bytes) {
-            long now = time.milliseconds();
-            this.bytesReceived.record(bytes, now);
-            if (!connection.isEmpty()) {
-                String nodeRequestName = "node-" + connection + ".bytes-received";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null)
-                    nodeRequest.record(bytes, now);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
deleted file mode 100644
index b9e8a50..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This interface models the in-progress sending of data to a destination identified by an integer id.
- */
-public interface Send {
-
-    /**
-     * The numeric id for the destination of this send
-     */
-    public String destination();
-
-    /**
-     * Is this send complete?
-     */
-    public boolean completed();
-
-    /**
-     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
-     * to be completely written
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails
-     */
-    public long writeTo(GatheringByteChannel channel) throws IOException;
-
-    /**
-     * Size of the send
-     */
-    public long size();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
deleted file mode 100644
index e12261c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Identifiers for all the Kafka APIs
- */
-public enum ApiKeys {
-    PRODUCE(0, "Produce"),
-    FETCH(1, "Fetch"),
-    LIST_OFFSETS(2, "Offsets"),
-    METADATA(3, "Metadata"),
-    LEADER_AND_ISR(4, "LeaderAndIsr"),
-    STOP_REPLICA(5, "StopReplica"),
-    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
-    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
-    OFFSET_COMMIT(8, "OffsetCommit"),
-    OFFSET_FETCH(9, "OffsetFetch"),
-    CONSUMER_METADATA(10, "ConsumerMetadata"),
-    JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat");
-
-    private static ApiKeys[] codeToType;
-    public static final int MAX_API_KEY;
-
-    static {
-        int maxKey = -1;
-        for (ApiKeys key : ApiKeys.values()) {
-            maxKey = Math.max(maxKey, key.id);
-        }
-        codeToType = new ApiKeys[maxKey + 1];
-        for (ApiKeys key : ApiKeys.values()) {
-            codeToType[key.id] = key;
-        }
-        MAX_API_KEY = maxKey;
-    }
-
-    /** the perminant and immutable id of an API--this can't change ever */
-    public final short id;
-
-    /** an english description of the api--this is for debugging and can change */
-    public final String name;
-
-    private ApiKeys(int id, String name) {
-        this.id = (short) id;
-        this.name = name;
-    }
-
-    public static ApiKeys forId(int id) {
-        return codeToType[id];
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
deleted file mode 100644
index 5ef3b24..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import org.apache.flink.kafka_backport.common.errors.ApiException;
-import org.apache.flink.kafka_backport.common.errors.IllegalGenerationException;
-import org.apache.flink.kafka_backport.common.errors.InvalidRequiredAcksException;
-import org.apache.flink.kafka_backport.common.errors.InvalidTopicException;
-import org.apache.flink.kafka_backport.common.errors.LeaderNotAvailableException;
-import org.apache.flink.kafka_backport.common.errors.NetworkException;
-import org.apache.flink.kafka_backport.common.errors.NotCoordinatorForConsumerException;
-import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasException;
-import org.apache.flink.kafka_backport.common.errors.NotLeaderForPartitionException;
-import org.apache.flink.kafka_backport.common.errors.OffsetLoadInProgressException;
-import org.apache.flink.kafka_backport.common.errors.OffsetMetadataTooLarge;
-import org.apache.flink.kafka_backport.common.errors.OffsetOutOfRangeException;
-import org.apache.flink.kafka_backport.common.errors.RecordBatchTooLargeException;
-import org.apache.flink.kafka_backport.common.errors.TimeoutException;
-import org.apache.flink.kafka_backport.common.errors.UnknownConsumerIdException;
-import org.apache.flink.kafka_backport.common.errors.UnknownServerException;
-import org.apache.flink.kafka_backport.common.errors.UnknownTopicOrPartitionException;
-import org.apache.flink.kafka_backport.common.errors.ConsumerCoordinatorNotAvailableException;
-import org.apache.flink.kafka_backport.common.errors.CorruptRecordException;
-import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasAfterAppendException;
-import org.apache.flink.kafka_backport.common.errors.RecordTooLargeException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
- * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
- * Do not add exceptions that occur only on the client or only on the server here.
- */
-public enum Errors {
-    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
-    NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-            new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3,
-            new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    // TODO: errorCode 4 for InvalidFetchSize
-    LEADER_NOT_AVAILABLE(5,
-            new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
-    NOT_LEADER_FOR_PARTITION(6,
-            new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7,
-            new TimeoutException("The request timed out.")),
-    // TODO: errorCode 8 for BrokerNotAvailable
-    REPLICA_NOT_AVAILABLE(9,
-            new ApiException("The replica is not available for the requested topic-partition")),
-    MESSAGE_TOO_LARGE(10,
-            new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    OFFSET_METADATA_TOO_LARGE(12,
-            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13,
-            new NetworkException("The server disconnected before a response was received.")),
-    OFFSET_LOAD_IN_PROGRESS(14,
-            new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
-    CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
-            new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
-    NOT_COORDINATOR_FOR_CONSUMER(16,
-            new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
-    INVALID_TOPIC_EXCEPTION(17,
-            new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18,
-            new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
-    NOT_ENOUGH_REPLICAS(19,
-            new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
-    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
-            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
-    INVALID_REQUIRED_ACKS(21,
-            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
-    ILLEGAL_GENERATION(22,
-            new IllegalGenerationException("Specified consumer generation id is not valid.")),
-    INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
-            new ApiException("The request partition assignment strategy does not match that of the group.")),
-    UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
-            new ApiException("The request partition assignment strategy is unknown to the broker.")),
-    UNKNOWN_CONSUMER_ID(25,
-            new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
-    INVALID_SESSION_TIMEOUT(26,
-            new ApiException("The session timeout is not within an acceptable range.")),
-    COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
-            new ApiException("Some of the committing partitions are not assigned the committer")),
-    INVALID_COMMIT_OFFSET_SIZE(28,
-            new ApiException("The committing offset data size is not valid"));
-
-    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
-    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
-
-    static {
-        for (Errors error : Errors.values()) {
-            codeToError.put(error.code(), error);
-            if (error.exception != null)
-                classToError.put(error.exception.getClass(), error);
-        }
-    }
-
-    private final short code;
-    private final ApiException exception;
-
-    private Errors(int code, ApiException exception) {
-        this.code = (short) code;
-        this.exception = exception;
-    }
-
-    /**
-     * An instance of the exception
-     */
-    public ApiException exception() {
-        return this.exception;
-    }
-
-    /**
-     * The error code for the exception
-     */
-    public short code() {
-        return this.code;
-    }
-
-    /**
-     * Throw the exception corresponding to this error if there is one
-     */
-    public void maybeThrow() {
-        if (exception != null) {
-            throw this.exception;
-        }
-    }
-
-    /**
-     * Throw the exception if there is one
-     */
-    public static Errors forCode(short code) {
-        Errors error = codeToError.get(code);
-        return error == null ? UNKNOWN : error;
-    }
-
-    /**
-     * Return the error instance associated with this exception (or UKNOWN if there is none)
-     */
-    public static Errors forException(Throwable t) {
-        Errors error = classToError.get(t.getClass());
-        return error == null ? UNKNOWN : error;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
deleted file mode 100644
index e5ae9a4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ProtoUtils {
-
-    private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
-        if (apiKey < 0 || apiKey > schemas.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        Schema[] versions = schemas[apiKey];
-        if (version < 0 || version > versions.length)
-            throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
-        return versions[version];
-    }
-
-    public static short latestVersion(int apiKey) {
-        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        return Protocol.CURR_VERSION[apiKey];
-    }
-
-    public static Schema requestSchema(int apiKey, int version) {
-        return schemaFor(Protocol.REQUESTS, apiKey, version);
-    }
-
-    public static Schema currentRequestSchema(int apiKey) {
-        return requestSchema(apiKey, latestVersion(apiKey));
-    }
-
-    public static Schema responseSchema(int apiKey, int version) {
-        return schemaFor(Protocol.RESPONSES, apiKey, version);
-    }
-
-    public static Schema currentResponseSchema(int apiKey) {
-        return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
-    }
-
-    public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
-        return (Struct) requestSchema(apiKey, version).read(buffer);
-    }
-
-    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
-        return (Struct) currentResponseSchema(apiKey).read(buffer);
-    }
-
-}