You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:34 UTC

[17/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
new file mode 100644
index 0000000..c0d5d99
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkReceive implements Receive {
+
+    public final static String UNKNOWN_SOURCE = "";
+    public final static int UNLIMITED = -1;
+
+    private final String source;
+    private final ByteBuffer size;
+    private final int maxSize;
+    private ByteBuffer buffer;
+
+
+    public NetworkReceive(String source, ByteBuffer buffer) {
+        this.source = source;
+        this.buffer = buffer;
+        this.size = null;
+        this.maxSize = UNLIMITED;
+    }
+
+    public NetworkReceive(String source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = UNLIMITED;
+    }
+
+    public NetworkReceive(int maxSize, String source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = maxSize;
+    }
+
+    public NetworkReceive() {
+        this(UNKNOWN_SOURCE);
+    }
+
+    @Override
+    public String source() {
+        return source;
+    }
+
+    @Override
+    public boolean complete() {
+        return !size.hasRemaining() && !buffer.hasRemaining();
+    }
+
+    public long readFrom(ScatteringByteChannel channel) throws IOException {
+        return readFromReadableChannel(channel);
+    }
+
+    // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
+    // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
+    // This can go away after we get rid of BlockingChannel
+    @Deprecated
+    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
+        int read = 0;
+        if (size.hasRemaining()) {
+            int bytesRead = channel.read(size);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+            if (!size.hasRemaining()) {
+                size.rewind();
+                int receiveSize = size.getInt();
+                if (receiveSize < 0)
+                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
+                if (maxSize != UNLIMITED && receiveSize > maxSize)
+                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
+                this.buffer = ByteBuffer.allocate(receiveSize);
+            }
+        }
+        if (buffer != null) {
+            int bytesRead = channel.read(buffer);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+        }
+
+        return read;
+    }
+
+    public ByteBuffer payload() {
+        return this.buffer;
+    }
+
+    // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
+    @Deprecated
+    public long readCompletely(ReadableByteChannel channel) throws IOException {
+        int totalRead = 0;
+        while (!complete()) {
+            totalRead += readFromReadableChannel(channel);
+        }
+        return totalRead;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
new file mode 100644
index 0000000..29ce09d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkSend extends ByteBufferSend {
+
+    public NetworkSend(String destination, ByteBuffer... buffers) {
+        super(destination, sizeDelimit(buffers));
+    }
+
+    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
+        int size = 0;
+        for (int i = 0; i < buffers.length; i++)
+            size += buffers[i].remaining();
+        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
+        delimited[0] = ByteBuffer.allocate(4);
+        delimited[0].putInt(size);
+        delimited[0].rewind();
+        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
+        return delimited;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
new file mode 100644
index 0000000..b799e7c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import java.io.IOException;
+import java.nio.channels.ScatteringByteChannel;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This interface models the in-progress reading of data from a channel to a source identified by an integer id
+ */
+public interface Receive {
+
+    /**
+     * The numeric id of the source from which we are receiving data.
+     */
+    public String source();
+
+    /**
+     * Are we done receiving data?
+     */
+    public boolean complete();
+
+    /**
+     * Read bytes into this receive from the given channel
+     * @param channel The channel to read from
+     * @return The number of bytes read
+     * @throws IOException If the reading fails
+     */
+    public long readFrom(ScatteringByteChannel channel) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
new file mode 100644
index 0000000..08da141
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * An interface for asynchronous, multi-channel network I/O
+ */
+public interface Selectable {
+
+    /**
+     * Begin establishing a socket connection to the given address identified by the given address
+     * @param id The id for this connection
+     * @param address The address to connect to
+     * @param sendBufferSize The send buffer for the socket
+     * @param receiveBufferSize The receive buffer for the socket
+     * @throws IOException If we cannot begin connecting
+     */
+    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
+
+    /**
+     * Begin disconnecting the connection identified by the given id
+     */
+    public void disconnect(String id);
+
+    /**
+     * Wakeup this selector if it is blocked on I/O
+     */
+    public void wakeup();
+
+    /**
+     * Close this selector
+     */
+    public void close();
+
+    /**
+     * Queue the given request for sending in the subsequent {@poll(long)} calls
+     * @param send The request to send
+     */
+    public void send(Send send);
+
+    /**
+     * Do I/O. Reads, writes, connection establishment, etc.
+     * @param timeout The amount of time to block if there is nothing to do
+     * @throws IOException
+     */
+    public void poll(long timeout) throws IOException;
+
+    /**
+     * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
+     */
+    public List<Send> completedSends();
+
+    /**
+     * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
+     */
+    public List<NetworkReceive> completedReceives();
+
+    /**
+     * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
+     * call.
+     */
+    public List<String> disconnected();
+
+    /**
+     * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
+     * call.
+     */
+    public List<String> connected();
+
+    /**
+     * Disable reads from the given connection
+     * @param id The id for the connection
+     */
+    public void mute(String id);
+
+    /**
+     * Re-enable reads from the given connection
+     * @param id The id for the connection
+     */
+    public void unmute(String id);
+
+    /**
+     * Disable reads from all connections
+     */
+    public void muteAll();
+
+    /**
+     * Re-enable reads from all connections
+     */
+    public void unmuteAll();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
new file mode 100644
index 0000000..a886e3b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
@@ -0,0 +1,664 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.metrics.Measurable;
+import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.metrics.Metrics;
+import org.apache.flink.kafka_backport.common.metrics.Sensor;
+import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
+import org.apache.flink.kafka_backport.common.metrics.stats.Count;
+import org.apache.flink.kafka_backport.common.metrics.stats.Max;
+import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
+import org.apache.flink.kafka_backport.common.utils.SystemTime;
+import org.apache.flink.kafka_backport.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A nioSelector interface for doing non-blocking multi-connection network I/O.
+ * <p>
+ * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
+ * responses.
+ * <p>
+ * A connection can be added to the nioSelector associated with an integer id by doing
+ * 
+ * <pre>
+ * nioSelector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
+ * </pre>
+ * 
+ * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
+ * the connection. The successful invocation of this method does not mean a valid connection has been established.
+ * 
+ * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
+ * connections are all done using the <code>poll()</code> call.
+ * 
+ * <pre>
+ * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
+ * nioSelector.poll(TIMEOUT_MS, requestsToSend);
+ * </pre>
+ * 
+ * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
+ * various getters. These are reset by each call to <code>poll()</code>.
+ * 
+ * This class is not thread safe!
+ */
+public class Selector implements Selectable {
+
+    private static final Logger log = LoggerFactory.getLogger(Selector.class);
+
+    private final java.nio.channels.Selector nioSelector;
+    private final Map<String, SelectionKey> keys;
+    private final List<Send> completedSends;
+    private final List<NetworkReceive> completedReceives;
+    private final List<String> disconnected;
+    private final List<String> connected;
+    private final List<String> failedSends;
+    private final Time time;
+    private final SelectorMetrics sensors;
+    private final String metricGrpPrefix;
+    private final Map<String, String> metricTags;
+    private final Map<String, Long> lruConnections;
+    private final long connectionsMaxIdleNanos;
+    private final int maxReceiveSize;
+    private final boolean metricsPerConnection;
+    private long currentTimeNanos;
+    private long nextIdleCloseCheckTime;
+
+
+    /**
+     * Create a new nioSelector
+     */
+    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
+        try {
+            this.nioSelector = java.nio.channels.Selector.open();
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+        this.maxReceiveSize = maxReceiveSize;
+        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
+        this.time = time;
+        this.metricGrpPrefix = metricGrpPrefix;
+        this.metricTags = metricTags;
+        this.keys = new HashMap<String, SelectionKey>();
+        this.completedSends = new ArrayList<Send>();
+        this.completedReceives = new ArrayList<NetworkReceive>();
+        this.connected = new ArrayList<String>();
+        this.disconnected = new ArrayList<String>();
+        this.failedSends = new ArrayList<String>();
+        this.sensors = new SelectorMetrics(metrics);
+        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
+        this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
+        currentTimeNanos = new SystemTime().nanoseconds();
+        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+        this.metricsPerConnection = metricsPerConnection;
+    }
+
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
+    }
+
+    /**
+     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
+     * number.
+     * <p>
+     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
+     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
+     * @param id The id for the new connection
+     * @param address The address to connect to
+     * @param sendBufferSize The send buffer for the new connection
+     * @param receiveBufferSize The receive buffer for the new connection
+     * @throws IllegalStateException if there is already a connection for that id
+     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
+     */
+    @Override
+    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+        if (this.keys.containsKey(id))
+            throw new IllegalStateException("There is already a connection for id " + id);
+
+        SocketChannel channel = SocketChannel.open();
+        channel.configureBlocking(false);
+        Socket socket = channel.socket();
+        socket.setKeepAlive(true);
+        socket.setSendBufferSize(sendBufferSize);
+        socket.setReceiveBufferSize(receiveBufferSize);
+        socket.setTcpNoDelay(true);
+        try {
+            channel.connect(address);
+        } catch (UnresolvedAddressException e) {
+            channel.close();
+            throw new IOException("Can't resolve address: " + address, e);
+        } catch (IOException e) {
+            channel.close();
+            throw e;
+        }
+        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
+        key.attach(new Transmissions(id));
+        this.keys.put(id, key);
+    }
+
+    /**
+     * Register the nioSelector with an existing channel
+     * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
+     * Note that we are not checking if the connection id is valid - since the connection already exists
+     */
+    public void register(String id, SocketChannel channel) throws ClosedChannelException {
+        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
+        key.attach(new Transmissions(id));
+        this.keys.put(id, key);
+    }
+
+    /**
+     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
+     * processed until the next {@link #poll(long, List) poll()} call.
+     */
+    @Override
+    public void disconnect(String id) {
+        SelectionKey key = this.keys.get(id);
+        if (key != null)
+            key.cancel();
+    }
+
+    /**
+     * Interrupt the nioSelector if it is blocked waiting to do I/O.
+     */
+    @Override
+    public void wakeup() {
+        this.nioSelector.wakeup();
+    }
+
+    /**
+     * Close this selector and all associated connections
+     */
+    @Override
+    public void close() {
+        List<String> connections = new LinkedList<String>(keys.keySet());
+        for (String id: connections)
+            close(id);
+
+        try {
+            this.nioSelector.close();
+        } catch (IOException e) {
+            log.error("Exception closing nioSelector:", e);
+        }
+    }
+
+    /**
+     * Queue the given request for sending in the subsequent {@poll(long)} calls
+     * @param send The request to send
+     */
+    public void send(Send send) {
+        SelectionKey key = keyForId(send.destination());
+        Transmissions transmissions = transmissions(key);
+        if (transmissions.hasSend())
+            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+        transmissions.send = send;
+        try {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        } catch (CancelledKeyException e) {
+            close(transmissions.id);
+            this.failedSends.add(send.destination());
+        }
+    }
+
+    /**
+     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
+     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
+     * 
+     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
+     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
+     * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
+     * completed I/O.
+     * 
+     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
+     *         already an in-progress send
+     */
+    @Override
+    public void poll(long timeout) throws IOException {
+        clear();
+
+        /* check ready keys */
+        long startSelect = time.nanoseconds();
+        int readyKeys = select(timeout);
+        long endSelect = time.nanoseconds();
+        currentTimeNanos = endSelect;
+        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
+
+        if (readyKeys > 0) {
+            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
+            Iterator<SelectionKey> iter = keys.iterator();
+            while (iter.hasNext()) {
+                SelectionKey key = iter.next();
+                iter.remove();
+
+                Transmissions transmissions = transmissions(key);
+                SocketChannel channel = channel(key);
+
+                // register all per-connection metrics at once
+                sensors.maybeRegisterConnectionMetrics(transmissions.id);
+                lruConnections.put(transmissions.id, currentTimeNanos);
+
+                try {
+                    /* complete any connections that have finished their handshake */
+                    if (key.isConnectable()) {
+                        channel.finishConnect();
+                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+                        this.connected.add(transmissions.id);
+                        this.sensors.connectionCreated.record();
+                        log.debug("Connection {} created", transmissions.id);
+                    }
+
+                    /* read from any connections that have readable data */
+                    if (key.isReadable()) {
+                        if (!transmissions.hasReceive())
+                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+                        try {
+                            transmissions.receive.readFrom(channel);
+                        } catch (InvalidReceiveException e) {
+                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
+                            close(transmissions.id);
+                            this.disconnected.add(transmissions.id);
+                            throw e;
+                        }
+                        if (transmissions.receive.complete()) {
+                            transmissions.receive.payload().rewind();
+                            this.completedReceives.add(transmissions.receive);
+                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
+                            transmissions.clearReceive();
+                        }
+                    }
+
+                    /* write to any sockets that have space in their buffer and for which we have data */
+                    if (key.isWritable()) {
+                        transmissions.send.writeTo(channel);
+                        if (transmissions.send.completed()) {
+                            this.completedSends.add(transmissions.send);
+                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
+                            transmissions.clearSend();
+                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                        }
+                    }
+
+                    /* cancel any defunct sockets */
+                    if (!key.isValid()) {
+                        close(transmissions.id);
+                        this.disconnected.add(transmissions.id);
+                    }
+                } catch (IOException e) {
+                    String desc = socketDescription(channel);
+                    if (e instanceof EOFException || e instanceof ConnectException)
+                        log.debug("Connection {} disconnected", desc);
+                    else
+                        log.warn("Error in I/O with connection to {}", desc, e);
+                    close(transmissions.id);
+                    this.disconnected.add(transmissions.id);
+                }
+            }
+        }
+        long endIo = time.nanoseconds();
+        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
+        maybeCloseOldestConnection();
+    }
+
+    private String socketDescription(SocketChannel channel) {
+        Socket socket = channel.socket();
+        if (socket == null)
+            return "[unconnected socket]";
+        else if (socket.getInetAddress() != null)
+            return socket.getInetAddress().toString();
+        else
+            return socket.getLocalAddress().toString();
+    }
+
+    @Override
+    public List<Send> completedSends() {
+        return this.completedSends;
+    }
+
+    @Override
+    public List<NetworkReceive> completedReceives() {
+        return this.completedReceives;
+    }
+
+    @Override
+    public List<String> disconnected() {
+        return this.disconnected;
+    }
+
+    @Override
+    public List<String> connected() {
+        return this.connected;
+    }
+
+    @Override
+    public void mute(String id) {
+        mute(this.keyForId(id));
+    }
+
+    private void mute(SelectionKey key) {
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void unmute(String id) {
+        unmute(this.keyForId(id));
+    }
+
+    private void unmute(SelectionKey key) {
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void muteAll() {
+        for (SelectionKey key : this.keys.values())
+            mute(key);
+    }
+
+    @Override
+    public void unmuteAll() {
+        for (SelectionKey key : this.keys.values())
+            unmute(key);
+    }
+
+    private void maybeCloseOldestConnection() {
+        if (currentTimeNanos > nextIdleCloseCheckTime) {
+            if (lruConnections.isEmpty()) {
+                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+            } else {
+                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
+                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
+                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
+                if (currentTimeNanos > nextIdleCloseCheckTime) {
+                    String connectionId = oldestConnectionEntry.getKey();
+                    if (log.isTraceEnabled())
+                        log.trace("About to close the idle connection from " + connectionId
+                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
+
+                    disconnected.add(connectionId);
+                    close(connectionId);
+                }
+            }
+        }
+    }
+
+    /**
+     * Clear the results from the prior poll
+     */
+    private void clear() {
+        this.completedSends.clear();
+        this.completedReceives.clear();
+        this.connected.clear();
+        this.disconnected.clear();
+        this.disconnected.addAll(this.failedSends);
+        this.failedSends.clear();
+    }
+
+    /**
+     * Check for data, waiting up to the given timeout.
+     * 
+     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
+     * @return The number of keys ready
+     * @throws IOException
+     */
+    private int select(long ms) throws IOException {
+        if (ms == 0L)
+            return this.nioSelector.selectNow();
+        else if (ms < 0L)
+            return this.nioSelector.select();
+        else
+            return this.nioSelector.select(ms);
+    }
+
+    /**
+     * Begin closing this connection
+     */
+    public void close(String id) {
+        SelectionKey key = keyForId(id);
+        lruConnections.remove(id);
+        SocketChannel channel = channel(key);
+        Transmissions trans = transmissions(key);
+        if (trans != null) {
+            this.keys.remove(trans.id);
+            trans.clearReceive();
+            trans.clearSend();
+        }
+        key.attach(null);
+        key.cancel();
+        try {
+            channel.socket().close();
+            channel.close();
+        } catch (IOException e) {
+            log.error("Exception closing connection to node {}:", trans.id, e);
+        }
+        this.sensors.connectionClosed.record();
+    }
+
+    /**
+     * Get the selection key associated with this numeric id
+     */
+    private SelectionKey keyForId(String id) {
+        SelectionKey key = this.keys.get(id);
+        if (key == null)
+            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
+        return key;
+    }
+
+    /**
+     * Get the transmissions for the given connection
+     */
+    private Transmissions transmissions(SelectionKey key) {
+        return (Transmissions) key.attachment();
+    }
+
+    /**
+     * Get the socket channel associated with this selection key
+     */
+    private SocketChannel channel(SelectionKey key) {
+        return (SocketChannel) key.channel();
+    }
+
+    /**
+     * The id and in-progress send and receive associated with a connection
+     */
+    private static class Transmissions {
+        public String id;
+        public Send send;
+        public NetworkReceive receive;
+
+        public Transmissions(String id) {
+            this.id = id;
+        }
+
+        public boolean hasSend() {
+            return this.send != null;
+        }
+
+        public void clearSend() {
+            this.send = null;
+        }
+
+        public boolean hasReceive() {
+            return this.receive != null;
+        }
+
+        public void clearReceive() {
+            this.receive = null;
+        }
+    }
+
+    private class SelectorMetrics {
+        private final Metrics metrics;
+        public final Sensor connectionClosed;
+        public final Sensor connectionCreated;
+        public final Sensor bytesTransferred;
+        public final Sensor bytesSent;
+        public final Sensor bytesReceived;
+        public final Sensor selectTime;
+        public final Sensor ioTime;
+
+        public SelectorMetrics(Metrics metrics) {
+            this.metrics = metrics;
+            String metricGrpName = metricGrpPrefix + "-metrics";
+            StringBuilder tagsSuffix = new StringBuilder();
+
+            for (Map.Entry<String, String> tag: metricTags.entrySet()) {
+                tagsSuffix.append(tag.getKey());
+                tagsSuffix.append("-");
+                tagsSuffix.append(tag.getValue());
+            }
+
+            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
+            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
+            this.connectionClosed.add(metricName, new Rate());
+
+            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
+            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
+            this.connectionCreated.add(metricName, new Rate());
+
+            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
+            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
+            bytesTransferred.add(metricName, new Rate(new Count()));
+
+            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
+            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
+            this.bytesSent.add(metricName, new Rate());
+            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
+            this.bytesSent.add(metricName, new Rate(new Count()));
+            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
+            this.bytesSent.add(metricName, new Avg());
+            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
+            this.bytesSent.add(metricName, new Max());
+
+            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
+            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
+            this.bytesReceived.add(metricName, new Rate());
+            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
+            this.bytesReceived.add(metricName, new Rate(new Count()));
+
+            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
+            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
+            this.selectTime.add(metricName, new Rate(new Count()));
+            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
+            this.selectTime.add(metricName, new Avg());
+            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
+            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+
+            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
+            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
+            this.ioTime.add(metricName, new Avg());
+            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
+            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+
+            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
+            this.metrics.addMetric(metricName, new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return keys.size();
+                }
+            });
+        }
+
+        public void maybeRegisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && metricsPerConnection) {
+                // if one sensor of the metrics has been registered for the connection,
+                // then all other sensors should have been registered; and vice versa
+                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest == null) {
+                    String metricGrpName = metricGrpPrefix + "-node-metrics";
+
+                    Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
+                    tags.put("node-id", "node-" + connectionId);
+
+                    nodeRequest = this.metrics.sensor(nodeRequestName);
+                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
+                    nodeRequest.add(metricName, new Rate());
+                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
+                    nodeRequest.add(metricName, new Rate(new Count()));
+                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
+                    nodeRequest.add(metricName, new Avg());
+                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
+                    nodeRequest.add(metricName, new Max());
+
+                    String nodeResponseName = "node-" + connectionId + ".bytes-received";
+                    Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
+                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
+                    nodeResponse.add(metricName, new Rate());
+                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
+                    nodeResponse.add(metricName, new Rate(new Count()));
+
+                    String nodeTimeName = "node-" + connectionId + ".latency";
+                    Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
+                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
+                    nodeRequestTime.add(metricName, new Avg());
+                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
+                    nodeRequestTime.add(metricName, new Max());
+                }
+            }
+        }
+
+        public void recordBytesSent(String connectionId, long bytes) {
+            long now = time.milliseconds();
+            this.bytesSent.record(bytes, now);
+            if (!connectionId.isEmpty()) {
+                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest != null)
+                    nodeRequest.record(bytes, now);
+            }
+        }
+
+        public void recordBytesReceived(String connection, int bytes) {
+            long now = time.milliseconds();
+            this.bytesReceived.record(bytes, now);
+            if (!connection.isEmpty()) {
+                String nodeRequestName = "node-" + connection + ".bytes-received";
+                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
+                if (nodeRequest != null)
+                    nodeRequest.record(bytes, now);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
new file mode 100644
index 0000000..b9e8a50
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.network;
+
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This interface models the in-progress sending of data to a destination identified by an integer id.
+ */
+public interface Send {
+
+    /**
+     * The numeric id for the destination of this send
+     */
+    public String destination();
+
+    /**
+     * Is this send complete?
+     */
+    public boolean completed();
+
+    /**
+     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
+     * to be completely written
+     * @param channel The channel to write to
+     * @return The number of bytes written
+     * @throws IOException If the write fails
+     */
+    public long writeTo(GatheringByteChannel channel) throws IOException;
+
+    /**
+     * Size of the send
+     */
+    public long size();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
new file mode 100644
index 0000000..e12261c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Identifiers for all the Kafka APIs
+ */
+public enum ApiKeys {
+    PRODUCE(0, "Produce"),
+    FETCH(1, "Fetch"),
+    LIST_OFFSETS(2, "Offsets"),
+    METADATA(3, "Metadata"),
+    LEADER_AND_ISR(4, "LeaderAndIsr"),
+    STOP_REPLICA(5, "StopReplica"),
+    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
+    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
+    OFFSET_COMMIT(8, "OffsetCommit"),
+    OFFSET_FETCH(9, "OffsetFetch"),
+    CONSUMER_METADATA(10, "ConsumerMetadata"),
+    JOIN_GROUP(11, "JoinGroup"),
+    HEARTBEAT(12, "Heartbeat");
+
+    private static ApiKeys[] codeToType;
+    public static final int MAX_API_KEY;
+
+    static {
+        int maxKey = -1;
+        for (ApiKeys key : ApiKeys.values()) {
+            maxKey = Math.max(maxKey, key.id);
+        }
+        codeToType = new ApiKeys[maxKey + 1];
+        for (ApiKeys key : ApiKeys.values()) {
+            codeToType[key.id] = key;
+        }
+        MAX_API_KEY = maxKey;
+    }
+
+    /** the perminant and immutable id of an API--this can't change ever */
+    public final short id;
+
+    /** an english description of the api--this is for debugging and can change */
+    public final String name;
+
+    private ApiKeys(int id, String name) {
+        this.id = (short) id;
+        this.name = name;
+    }
+
+    public static ApiKeys forId(int id) {
+        return codeToType[id];
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
new file mode 100644
index 0000000..5ef3b24
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import org.apache.flink.kafka_backport.common.errors.ApiException;
+import org.apache.flink.kafka_backport.common.errors.IllegalGenerationException;
+import org.apache.flink.kafka_backport.common.errors.InvalidRequiredAcksException;
+import org.apache.flink.kafka_backport.common.errors.InvalidTopicException;
+import org.apache.flink.kafka_backport.common.errors.LeaderNotAvailableException;
+import org.apache.flink.kafka_backport.common.errors.NetworkException;
+import org.apache.flink.kafka_backport.common.errors.NotCoordinatorForConsumerException;
+import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasException;
+import org.apache.flink.kafka_backport.common.errors.NotLeaderForPartitionException;
+import org.apache.flink.kafka_backport.common.errors.OffsetLoadInProgressException;
+import org.apache.flink.kafka_backport.common.errors.OffsetMetadataTooLarge;
+import org.apache.flink.kafka_backport.common.errors.OffsetOutOfRangeException;
+import org.apache.flink.kafka_backport.common.errors.RecordBatchTooLargeException;
+import org.apache.flink.kafka_backport.common.errors.TimeoutException;
+import org.apache.flink.kafka_backport.common.errors.UnknownConsumerIdException;
+import org.apache.flink.kafka_backport.common.errors.UnknownServerException;
+import org.apache.flink.kafka_backport.common.errors.UnknownTopicOrPartitionException;
+import org.apache.flink.kafka_backport.common.errors.ConsumerCoordinatorNotAvailableException;
+import org.apache.flink.kafka_backport.common.errors.CorruptRecordException;
+import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasAfterAppendException;
+import org.apache.flink.kafka_backport.common.errors.RecordTooLargeException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
+ * are thus part of the protocol. The names can be changed but the error code cannot.
+ * 
+ * Do not add exceptions that occur only on the client or only on the server here.
+ */
+public enum Errors {
+    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
+    NONE(0, null),
+    OFFSET_OUT_OF_RANGE(1,
+            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+    CORRUPT_MESSAGE(2,
+            new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+    UNKNOWN_TOPIC_OR_PARTITION(3,
+            new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
+    // TODO: errorCode 4 for InvalidFetchSize
+    LEADER_NOT_AVAILABLE(5,
+            new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+    NOT_LEADER_FOR_PARTITION(6,
+            new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
+    REQUEST_TIMED_OUT(7,
+            new TimeoutException("The request timed out.")),
+    // TODO: errorCode 8 for BrokerNotAvailable
+    REPLICA_NOT_AVAILABLE(9,
+            new ApiException("The replica is not available for the requested topic-partition")),
+    MESSAGE_TOO_LARGE(10,
+            new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+    OFFSET_METADATA_TOO_LARGE(12,
+            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
+    NETWORK_EXCEPTION(13,
+            new NetworkException("The server disconnected before a response was received.")),
+    OFFSET_LOAD_IN_PROGRESS(14,
+            new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
+    CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
+            new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
+    NOT_COORDINATOR_FOR_CONSUMER(16,
+            new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
+    INVALID_TOPIC_EXCEPTION(17,
+            new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
+    RECORD_LIST_TOO_LARGE(18,
+            new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
+    NOT_ENOUGH_REPLICAS(19,
+            new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
+    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
+            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
+    INVALID_REQUIRED_ACKS(21,
+            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
+    ILLEGAL_GENERATION(22,
+            new IllegalGenerationException("Specified consumer generation id is not valid.")),
+    INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
+            new ApiException("The request partition assignment strategy does not match that of the group.")),
+    UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
+            new ApiException("The request partition assignment strategy is unknown to the broker.")),
+    UNKNOWN_CONSUMER_ID(25,
+            new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
+    INVALID_SESSION_TIMEOUT(26,
+            new ApiException("The session timeout is not within an acceptable range.")),
+    COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
+            new ApiException("Some of the committing partitions are not assigned the committer")),
+    INVALID_COMMIT_OFFSET_SIZE(28,
+            new ApiException("The committing offset data size is not valid"));
+
+    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
+    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+
+    static {
+        for (Errors error : Errors.values()) {
+            codeToError.put(error.code(), error);
+            if (error.exception != null)
+                classToError.put(error.exception.getClass(), error);
+        }
+    }
+
+    private final short code;
+    private final ApiException exception;
+
+    private Errors(int code, ApiException exception) {
+        this.code = (short) code;
+        this.exception = exception;
+    }
+
+    /**
+     * An instance of the exception
+     */
+    public ApiException exception() {
+        return this.exception;
+    }
+
+    /**
+     * The error code for the exception
+     */
+    public short code() {
+        return this.code;
+    }
+
+    /**
+     * Throw the exception corresponding to this error if there is one
+     */
+    public void maybeThrow() {
+        if (exception != null) {
+            throw this.exception;
+        }
+    }
+
+    /**
+     * Throw the exception if there is one
+     */
+    public static Errors forCode(short code) {
+        Errors error = codeToError.get(code);
+        return error == null ? UNKNOWN : error;
+    }
+
+    /**
+     * Return the error instance associated with this exception (or UKNOWN if there is none)
+     */
+    public static Errors forException(Throwable t) {
+        Errors error = classToError.get(t.getClass());
+        return error == null ? UNKNOWN : error;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..e5ae9a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ProtoUtils {
+
+    private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
+        if (apiKey < 0 || apiKey > schemas.length)
+            throw new IllegalArgumentException("Invalid api key: " + apiKey);
+        Schema[] versions = schemas[apiKey];
+        if (version < 0 || version > versions.length)
+            throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
+        return versions[version];
+    }
+
+    public static short latestVersion(int apiKey) {
+        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
+            throw new IllegalArgumentException("Invalid api key: " + apiKey);
+        return Protocol.CURR_VERSION[apiKey];
+    }
+
+    public static Schema requestSchema(int apiKey, int version) {
+        return schemaFor(Protocol.REQUESTS, apiKey, version);
+    }
+
+    public static Schema currentRequestSchema(int apiKey) {
+        return requestSchema(apiKey, latestVersion(apiKey));
+    }
+
+    public static Schema responseSchema(int apiKey, int version) {
+        return schemaFor(Protocol.RESPONSES, apiKey, version);
+    }
+
+    public static Schema currentResponseSchema(int apiKey) {
+        return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
+    }
+
+    public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
+        return (Struct) requestSchema(apiKey, version).read(buffer);
+    }
+
+    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
+        return (Struct) currentResponseSchema(apiKey).read(buffer);
+    }
+
+}