You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:22 UTC

[05/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selector.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selector.java
deleted file mode 100644
index d229211..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Selector.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import org.apache.kafka.copied.common.KafkaException;
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.metrics.Measurable;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-import org.apache.kafka.copied.common.metrics.Metrics;
-import org.apache.kafka.copied.common.metrics.Sensor;
-import org.apache.kafka.copied.common.metrics.stats.Avg;
-import org.apache.kafka.copied.common.metrics.stats.Count;
-import org.apache.kafka.copied.common.metrics.stats.Max;
-import org.apache.kafka.copied.common.metrics.stats.Rate;
-import org.apache.kafka.copied.common.utils.SystemTime;
-import org.apache.kafka.copied.common.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A nioSelector interface for doing non-blocking multi-connection network I/O.
- * <p>
- * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
- * responses.
- * <p>
- * A connection can be added to the nioSelector associated with an integer id by doing
- * 
- * <pre>
- * nioSelector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
- * </pre>
- * 
- * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
- * the connection. The successful invocation of this method does not mean a valid connection has been established.
- * 
- * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
- * connections are all done using the <code>poll()</code> call.
- * 
- * <pre>
- * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * nioSelector.poll(TIMEOUT_MS, requestsToSend);
- * </pre>
- * 
- * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
- * various getters. These are reset by each call to <code>poll()</code>.
- * 
- * This class is not thread safe!
- */
-public class Selector implements Selectable {
-
-    private static final Logger log = LoggerFactory.getLogger(Selector.class);
-
-    private final java.nio.channels.Selector nioSelector;
-    private final Map<String, SelectionKey> keys;
-    private final List<Send> completedSends;
-    private final List<NetworkReceive> completedReceives;
-    private final List<String> disconnected;
-    private final List<String> connected;
-    private final List<String> failedSends;
-    private final Time time;
-    private final SelectorMetrics sensors;
-    private final String metricGrpPrefix;
-    private final Map<String, String> metricTags;
-    private final Map<String, Long> lruConnections;
-    private final long connectionsMaxIdleNanos;
-    private final int maxReceiveSize;
-    private final boolean metricsPerConnection;
-    private long currentTimeNanos;
-    private long nextIdleCloseCheckTime;
-
-
-    /**
-     * Create a new nioSelector
-     */
-    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
-        try {
-            this.nioSelector = java.nio.channels.Selector.open();
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-        this.maxReceiveSize = maxReceiveSize;
-        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
-        this.time = time;
-        this.metricGrpPrefix = metricGrpPrefix;
-        this.metricTags = metricTags;
-        this.keys = new HashMap<String, SelectionKey>();
-        this.completedSends = new ArrayList<Send>();
-        this.completedReceives = new ArrayList<NetworkReceive>();
-        this.connected = new ArrayList<String>();
-        this.disconnected = new ArrayList<String>();
-        this.failedSends = new ArrayList<String>();
-        this.sensors = new SelectorMetrics(metrics);
-        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
-        this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
-        currentTimeNanos = new SystemTime().nanoseconds();
-        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
-        this.metricsPerConnection = metricsPerConnection;
-    }
-
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
-    }
-
-    /**
-     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
-     * number.
-     * <p>
-     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
-     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
-     * @param id The id for the new connection
-     * @param address The address to connect to
-     * @param sendBufferSize The send buffer for the new connection
-     * @param receiveBufferSize The receive buffer for the new connection
-     * @throws IllegalStateException if there is already a connection for that id
-     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
-     */
-    @Override
-    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
-        if (this.keys.containsKey(id))
-            throw new IllegalStateException("There is already a connection for id " + id);
-
-        SocketChannel channel = SocketChannel.open();
-        channel.configureBlocking(false);
-        Socket socket = channel.socket();
-        socket.setKeepAlive(true);
-        socket.setSendBufferSize(sendBufferSize);
-        socket.setReceiveBufferSize(receiveBufferSize);
-        socket.setTcpNoDelay(true);
-        try {
-            channel.connect(address);
-        } catch (UnresolvedAddressException e) {
-            channel.close();
-            throw new IOException("Can't resolve address: " + address, e);
-        } catch (IOException e) {
-            channel.close();
-            throw e;
-        }
-        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
-    }
-
-    /**
-     * Register the nioSelector with an existing channel
-     * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
-     * Note that we are not checking if the connection id is valid - since the connection already exists
-     */
-    public void register(String id, SocketChannel channel) throws ClosedChannelException {
-        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
-    }
-
-    /**
-     * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
-     * processed until the next {@link #poll(long, List) poll()} call.
-     */
-    @Override
-    public void disconnect(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key != null)
-            key.cancel();
-    }
-
-    /**
-     * Interrupt the nioSelector if it is blocked waiting to do I/O.
-     */
-    @Override
-    public void wakeup() {
-        this.nioSelector.wakeup();
-    }
-
-    /**
-     * Close this selector and all associated connections
-     */
-    @Override
-    public void close() {
-        List<String> connections = new LinkedList<String>(keys.keySet());
-        for (String id: connections)
-            close(id);
-
-        try {
-            this.nioSelector.close();
-        } catch (IOException e) {
-            log.error("Exception closing nioSelector:", e);
-        }
-    }
-
-    /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
-     * @param send The request to send
-     */
-    public void send(Send send) {
-        SelectionKey key = keyForId(send.destination());
-        Transmissions transmissions = transmissions(key);
-        if (transmissions.hasSend())
-            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
-        transmissions.send = send;
-        try {
-            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-        } catch (CancelledKeyException e) {
-            close(transmissions.id);
-            this.failedSends.add(send.destination());
-        }
-    }
-
-    /**
-     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
-     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
-     * 
-     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
-     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
-     * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
-     * completed I/O.
-     * 
-     * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
-     *         already an in-progress send
-     */
-    @Override
-    public void poll(long timeout) throws IOException {
-        clear();
-
-        /* check ready keys */
-        long startSelect = time.nanoseconds();
-        int readyKeys = select(timeout);
-        long endSelect = time.nanoseconds();
-        currentTimeNanos = endSelect;
-        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
-
-        if (readyKeys > 0) {
-            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
-            Iterator<SelectionKey> iter = keys.iterator();
-            while (iter.hasNext()) {
-                SelectionKey key = iter.next();
-                iter.remove();
-
-                Transmissions transmissions = transmissions(key);
-                SocketChannel channel = channel(key);
-
-                // register all per-connection metrics at once
-                sensors.maybeRegisterConnectionMetrics(transmissions.id);
-                lruConnections.put(transmissions.id, currentTimeNanos);
-
-                try {
-                    /* complete any connections that have finished their handshake */
-                    if (key.isConnectable()) {
-                        channel.finishConnect();
-                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-                        this.connected.add(transmissions.id);
-                        this.sensors.connectionCreated.record();
-                        log.debug("Connection {} created", transmissions.id);
-                    }
-
-                    /* read from any connections that have readable data */
-                    if (key.isReadable()) {
-                        if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
-                        try {
-                            transmissions.receive.readFrom(channel);
-                        } catch (InvalidReceiveException e) {
-                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
-                            close(transmissions.id);
-                            this.disconnected.add(transmissions.id);
-                            throw e;
-                        }
-                        if (transmissions.receive.complete()) {
-                            transmissions.receive.payload().rewind();
-                            this.completedReceives.add(transmissions.receive);
-                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
-                            transmissions.clearReceive();
-                        }
-                    }
-
-                    /* write to any sockets that have space in their buffer and for which we have data */
-                    if (key.isWritable()) {
-                        transmissions.send.writeTo(channel);
-                        if (transmissions.send.completed()) {
-                            this.completedSends.add(transmissions.send);
-                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
-                            transmissions.clearSend();
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                        }
-                    }
-
-                    /* cancel any defunct sockets */
-                    if (!key.isValid()) {
-                        close(transmissions.id);
-                        this.disconnected.add(transmissions.id);
-                    }
-                } catch (IOException e) {
-                    String desc = socketDescription(channel);
-                    if (e instanceof EOFException || e instanceof ConnectException)
-                        log.debug("Connection {} disconnected", desc);
-                    else
-                        log.warn("Error in I/O with connection to {}", desc, e);
-                    close(transmissions.id);
-                    this.disconnected.add(transmissions.id);
-                }
-            }
-        }
-        long endIo = time.nanoseconds();
-        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
-        maybeCloseOldestConnection();
-    }
-
-    private String socketDescription(SocketChannel channel) {
-        Socket socket = channel.socket();
-        if (socket == null)
-            return "[unconnected socket]";
-        else if (socket.getInetAddress() != null)
-            return socket.getInetAddress().toString();
-        else
-            return socket.getLocalAddress().toString();
-    }
-
-    @Override
-    public List<Send> completedSends() {
-        return this.completedSends;
-    }
-
-    @Override
-    public List<NetworkReceive> completedReceives() {
-        return this.completedReceives;
-    }
-
-    @Override
-    public List<String> disconnected() {
-        return this.disconnected;
-    }
-
-    @Override
-    public List<String> connected() {
-        return this.connected;
-    }
-
-    @Override
-    public void mute(String id) {
-        mute(this.keyForId(id));
-    }
-
-    private void mute(SelectionKey key) {
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-    }
-
-    @Override
-    public void unmute(String id) {
-        unmute(this.keyForId(id));
-    }
-
-    private void unmute(SelectionKey key) {
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-    }
-
-    @Override
-    public void muteAll() {
-        for (SelectionKey key : this.keys.values())
-            mute(key);
-    }
-
-    @Override
-    public void unmuteAll() {
-        for (SelectionKey key : this.keys.values())
-            unmute(key);
-    }
-
-    private void maybeCloseOldestConnection() {
-        if (currentTimeNanos > nextIdleCloseCheckTime) {
-            if (lruConnections.isEmpty()) {
-                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
-            } else {
-                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
-                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
-                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
-                if (currentTimeNanos > nextIdleCloseCheckTime) {
-                    String connectionId = oldestConnectionEntry.getKey();
-                    if (log.isTraceEnabled())
-                        log.trace("About to close the idle connection from " + connectionId
-                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
-
-                    disconnected.add(connectionId);
-                    close(connectionId);
-                }
-            }
-        }
-    }
-
-    /**
-     * Clear the results from the prior poll
-     */
-    private void clear() {
-        this.completedSends.clear();
-        this.completedReceives.clear();
-        this.connected.clear();
-        this.disconnected.clear();
-        this.disconnected.addAll(this.failedSends);
-        this.failedSends.clear();
-    }
-
-    /**
-     * Check for data, waiting up to the given timeout.
-     * 
-     * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
-     * @return The number of keys ready
-     * @throws IOException
-     */
-    private int select(long ms) throws IOException {
-        if (ms == 0L)
-            return this.nioSelector.selectNow();
-        else if (ms < 0L)
-            return this.nioSelector.select();
-        else
-            return this.nioSelector.select(ms);
-    }
-
-    /**
-     * Begin closing this connection
-     */
-    public void close(String id) {
-        SelectionKey key = keyForId(id);
-        lruConnections.remove(id);
-        SocketChannel channel = channel(key);
-        Transmissions trans = transmissions(key);
-        if (trans != null) {
-            this.keys.remove(trans.id);
-            trans.clearReceive();
-            trans.clearSend();
-        }
-        key.attach(null);
-        key.cancel();
-        try {
-            channel.socket().close();
-            channel.close();
-        } catch (IOException e) {
-            log.error("Exception closing connection to node {}:", trans.id, e);
-        }
-        this.sensors.connectionClosed.record();
-    }
-
-    /**
-     * Get the selection key associated with this numeric id
-     */
-    private SelectionKey keyForId(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
-        return key;
-    }
-
-    /**
-     * Get the transmissions for the given connection
-     */
-    private Transmissions transmissions(SelectionKey key) {
-        return (Transmissions) key.attachment();
-    }
-
-    /**
-     * Get the socket channel associated with this selection key
-     */
-    private SocketChannel channel(SelectionKey key) {
-        return (SocketChannel) key.channel();
-    }
-
-    /**
-     * The id and in-progress send and receive associated with a connection
-     */
-    private static class Transmissions {
-        public String id;
-        public Send send;
-        public NetworkReceive receive;
-
-        public Transmissions(String id) {
-            this.id = id;
-        }
-
-        public boolean hasSend() {
-            return this.send != null;
-        }
-
-        public void clearSend() {
-            this.send = null;
-        }
-
-        public boolean hasReceive() {
-            return this.receive != null;
-        }
-
-        public void clearReceive() {
-            this.receive = null;
-        }
-    }
-
-    private class SelectorMetrics {
-        private final Metrics metrics;
-        public final Sensor connectionClosed;
-        public final Sensor connectionCreated;
-        public final Sensor bytesTransferred;
-        public final Sensor bytesSent;
-        public final Sensor bytesReceived;
-        public final Sensor selectTime;
-        public final Sensor ioTime;
-
-        public SelectorMetrics(Metrics metrics) {
-            this.metrics = metrics;
-            String metricGrpName = metricGrpPrefix + "-metrics";
-            StringBuilder tagsSuffix = new StringBuilder();
-
-            for (Map.Entry<String, String> tag: metricTags.entrySet()) {
-                tagsSuffix.append(tag.getKey());
-                tagsSuffix.append("-");
-                tagsSuffix.append(tag.getValue());
-            }
-
-            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
-            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
-            this.connectionClosed.add(metricName, new Rate());
-
-            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
-            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
-            this.connectionCreated.add(metricName, new Rate());
-
-            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
-            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
-            bytesTransferred.add(metricName, new Rate(new Count()));
-
-            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
-            this.bytesSent.add(metricName, new Rate());
-            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
-            this.bytesSent.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
-            this.bytesSent.add(metricName, new Avg());
-            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
-            this.bytesSent.add(metricName, new Max());
-
-            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
-            this.bytesReceived.add(metricName, new Rate());
-            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
-            this.bytesReceived.add(metricName, new Rate(new Count()));
-
-            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
-            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
-            this.selectTime.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
-            this.selectTime.add(metricName, new Avg());
-            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
-            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
-            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
-            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
-            this.ioTime.add(metricName, new Avg());
-            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
-            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
-            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
-            this.metrics.addMetric(metricName, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return keys.size();
-                }
-            });
-        }
-
-        public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the connection,
-                // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    String metricGrpName = metricGrpPrefix + "-node-metrics";
-
-                    Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = this.metrics.sensor(nodeRequestName);
-                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
-                    nodeRequest.add(metricName, new Rate());
-                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
-                    nodeRequest.add(metricName, new Rate(new Count()));
-                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String nodeResponseName = "node-" + connectionId + ".bytes-received";
-                    Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
-                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
-                    nodeResponse.add(metricName, new Rate());
-                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
-                    nodeResponse.add(metricName, new Rate(new Count()));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
-                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
-            }
-        }
-
-        public void recordBytesSent(String connectionId, long bytes) {
-            long now = time.milliseconds();
-            this.bytesSent.record(bytes, now);
-            if (!connectionId.isEmpty()) {
-                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null)
-                    nodeRequest.record(bytes, now);
-            }
-        }
-
-        public void recordBytesReceived(String connection, int bytes) {
-            long now = time.milliseconds();
-            this.bytesReceived.record(bytes, now);
-            if (!connection.isEmpty()) {
-                String nodeRequestName = "node-" + connection + ".bytes-received";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest != null)
-                    nodeRequest.record(bytes, now);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Send.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Send.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Send.java
deleted file mode 100644
index a55b306..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/network/Send.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.network;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * This interface models the in-progress sending of data to a destination identified by an integer id.
- */
-public interface Send {
-
-    /**
-     * The numeric id for the destination of this send
-     */
-    public String destination();
-
-    /**
-     * Is this send complete?
-     */
-    public boolean completed();
-
-    /**
-     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
-     * to be completely written
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails
-     */
-    public long writeTo(GatheringByteChannel channel) throws IOException;
-
-    /**
-     * Size of the send
-     */
-    public long size();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ApiKeys.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ApiKeys.java
deleted file mode 100644
index 2b2f3da..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ApiKeys.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.protocol;
-
-/**
- * Identifiers for all the Kafka APIs
- */
-public enum ApiKeys {
-    PRODUCE(0, "Produce"),
-    FETCH(1, "Fetch"),
-    LIST_OFFSETS(2, "Offsets"),
-    METADATA(3, "Metadata"),
-    LEADER_AND_ISR(4, "LeaderAndIsr"),
-    STOP_REPLICA(5, "StopReplica"),
-    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
-    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
-    OFFSET_COMMIT(8, "OffsetCommit"),
-    OFFSET_FETCH(9, "OffsetFetch"),
-    CONSUMER_METADATA(10, "ConsumerMetadata"),
-    JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat");
-
-    private static ApiKeys[] codeToType;
-    public static final int MAX_API_KEY;
-
-    static {
-        int maxKey = -1;
-        for (ApiKeys key : ApiKeys.values()) {
-            maxKey = Math.max(maxKey, key.id);
-        }
-        codeToType = new ApiKeys[maxKey + 1];
-        for (ApiKeys key : ApiKeys.values()) {
-            codeToType[key.id] = key;
-        }
-        MAX_API_KEY = maxKey;
-    }
-
-    /** the perminant and immutable id of an API--this can't change ever */
-    public final short id;
-
-    /** an english description of the api--this is for debugging and can change */
-    public final String name;
-
-    private ApiKeys(int id, String name) {
-        this.id = (short) id;
-        this.name = name;
-    }
-
-    public static ApiKeys forId(int id) {
-        return codeToType[id];
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Errors.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Errors.java
deleted file mode 100644
index 7703fc0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Errors.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.protocol;
-
-import org.apache.kafka.copied.common.errors.ApiException;
-import org.apache.kafka.copied.common.errors.ConsumerCoordinatorNotAvailableException;
-import org.apache.kafka.copied.common.errors.CorruptRecordException;
-import org.apache.kafka.copied.common.errors.IllegalGenerationException;
-import org.apache.kafka.copied.common.errors.InvalidRequiredAcksException;
-import org.apache.kafka.copied.common.errors.InvalidTopicException;
-import org.apache.kafka.copied.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.copied.common.errors.NetworkException;
-import org.apache.kafka.copied.common.errors.NotCoordinatorForConsumerException;
-import org.apache.kafka.copied.common.errors.NotEnoughReplicasAfterAppendException;
-import org.apache.kafka.copied.common.errors.NotEnoughReplicasException;
-import org.apache.kafka.copied.common.errors.NotLeaderForPartitionException;
-import org.apache.kafka.copied.common.errors.OffsetLoadInProgressException;
-import org.apache.kafka.copied.common.errors.OffsetMetadataTooLarge;
-import org.apache.kafka.copied.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.copied.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.copied.common.errors.RecordTooLargeException;
-import org.apache.kafka.copied.common.errors.TimeoutException;
-import org.apache.kafka.copied.common.errors.UnknownConsumerIdException;
-import org.apache.kafka.copied.common.errors.UnknownServerException;
-import org.apache.kafka.copied.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
- * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
- * Do not add exceptions that occur only on the client or only on the server here.
- */
-public enum Errors {
-    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
-    NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-            new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3,
-            new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    // TODO: errorCode 4 for InvalidFetchSize
-    LEADER_NOT_AVAILABLE(5,
-            new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
-    NOT_LEADER_FOR_PARTITION(6,
-            new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7,
-            new TimeoutException("The request timed out.")),
-    // TODO: errorCode 8 for BrokerNotAvailable
-    REPLICA_NOT_AVAILABLE(9,
-            new ApiException("The replica is not available for the requested topic-partition")),
-    MESSAGE_TOO_LARGE(10,
-            new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    OFFSET_METADATA_TOO_LARGE(12,
-            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13,
-            new NetworkException("The server disconnected before a response was received.")),
-    OFFSET_LOAD_IN_PROGRESS(14,
-            new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
-    CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
-            new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
-    NOT_COORDINATOR_FOR_CONSUMER(16,
-            new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
-    INVALID_TOPIC_EXCEPTION(17,
-            new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18,
-            new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
-    NOT_ENOUGH_REPLICAS(19,
-            new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
-    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
-            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
-    INVALID_REQUIRED_ACKS(21,
-            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
-    ILLEGAL_GENERATION(22,
-            new IllegalGenerationException("Specified consumer generation id is not valid.")),
-    INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
-            new ApiException("The request partition assignment strategy does not match that of the group.")),
-    UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
-            new ApiException("The request partition assignment strategy is unknown to the broker.")),
-    UNKNOWN_CONSUMER_ID(25,
-            new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
-    INVALID_SESSION_TIMEOUT(26,
-            new ApiException("The session timeout is not within an acceptable range.")),
-    COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
-            new ApiException("Some of the committing partitions are not assigned the committer")),
-    INVALID_COMMIT_OFFSET_SIZE(28,
-            new ApiException("The committing offset data size is not valid"));
-
-    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
-    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
-
-    static {
-        for (Errors error : Errors.values()) {
-            codeToError.put(error.code(), error);
-            if (error.exception != null)
-                classToError.put(error.exception.getClass(), error);
-        }
-    }
-
-    private final short code;
-    private final ApiException exception;
-
-    private Errors(int code, ApiException exception) {
-        this.code = (short) code;
-        this.exception = exception;
-    }
-
-    /**
-     * An instance of the exception
-     */
-    public ApiException exception() {
-        return this.exception;
-    }
-
-    /**
-     * The error code for the exception
-     */
-    public short code() {
-        return this.code;
-    }
-
-    /**
-     * Throw the exception corresponding to this error if there is one
-     */
-    public void maybeThrow() {
-        if (exception != null) {
-            throw this.exception;
-        }
-    }
-
-    /**
-     * Throw the exception if there is one
-     */
-    public static Errors forCode(short code) {
-        Errors error = codeToError.get(code);
-        return error == null ? UNKNOWN : error;
-    }
-
-    /**
-     * Return the error instance associated with this exception (or UKNOWN if there is none)
-     */
-    public static Errors forException(Throwable t) {
-        Errors error = classToError.get(t.getClass());
-        return error == null ? UNKNOWN : error;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ProtoUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ProtoUtils.java
deleted file mode 100644
index 767964f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.protocol;
-
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ProtoUtils {
-
-    private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
-        if (apiKey < 0 || apiKey > schemas.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        Schema[] versions = schemas[apiKey];
-        if (version < 0 || version > versions.length)
-            throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
-        return versions[version];
-    }
-
-    public static short latestVersion(int apiKey) {
-        if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
-            throw new IllegalArgumentException("Invalid api key: " + apiKey);
-        return Protocol.CURR_VERSION[apiKey];
-    }
-
-    public static Schema requestSchema(int apiKey, int version) {
-        return schemaFor(Protocol.REQUESTS, apiKey, version);
-    }
-
-    public static Schema currentRequestSchema(int apiKey) {
-        return requestSchema(apiKey, latestVersion(apiKey));
-    }
-
-    public static Schema responseSchema(int apiKey, int version) {
-        return schemaFor(Protocol.RESPONSES, apiKey, version);
-    }
-
-    public static Schema currentResponseSchema(int apiKey) {
-        return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
-    }
-
-    public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
-        return (Struct) requestSchema(apiKey, version).read(buffer);
-    }
-
-    public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
-        return (Struct) currentResponseSchema(apiKey).read(buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Protocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Protocol.java
deleted file mode 100644
index dc33ae6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/protocol/Protocol.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.protocol;
-
-import org.apache.kafka.copied.common.protocol.types.ArrayOf;
-import org.apache.kafka.copied.common.protocol.types.Field;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-
-import static org.apache.kafka.copied.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.copied.common.protocol.types.Type.INT16;
-import static org.apache.kafka.copied.common.protocol.types.Type.INT32;
-import static org.apache.kafka.copied.common.protocol.types.Type.INT64;
-import static org.apache.kafka.copied.common.protocol.types.Type.STRING;
-
-public class Protocol {
-
-    public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
-                                                           new Field("api_version", INT16, "The version of the API."),
-                                                           new Field("correlation_id",
-                                                                     INT32,
-                                                                     "A user-supplied integer value that will be passed back with the response"),
-                                                           new Field("client_id",
-                                                                     STRING,
-                                                                     "A user specified identifier for the client making the request."));
-
-    public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
-                                                                      INT32,
-                                                                      "The user-supplied value passed in with the request"));
-
-    /* Metadata api */
-
-    public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
-                                                                          new ArrayOf(STRING),
-                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
-
-    public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
-                                                   new Field("host", STRING, "The hostname of the broker."),
-                                                   new Field("port",
-                                                             INT32,
-                                                             "The port on which the broker accepts requests."));
-
-    public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
-                                                                            INT16,
-                                                                            "The error code for the partition, if any."),
-                                                                  new Field("partition_id",
-                                                                            INT32,
-                                                                            "The id of the partition."),
-                                                                  new Field("leader",
-                                                                            INT32,
-                                                                            "The id of the broker acting as leader for this partition."),
-                                                                  new Field("replicas",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of all nodes that host this partition."),
-                                                                  new Field("isr",
-                                                                            new ArrayOf(INT32),
-                                                                            "The set of nodes that are in sync with the leader for this partition."));
-
-    public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
-                                                                        INT16,
-                                                                        "The error code for the given topic."),
-                                                              new Field("topic", STRING, "The name of the topic"),
-                                                              new Field("partition_metadata",
-                                                                        new ArrayOf(PARTITION_METADATA_V0),
-                                                                        "Metadata for each partition of the topic."));
-
-    public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
-                                                                           new ArrayOf(BROKER),
-                                                                           "Host and port information for all brokers."),
-                                                                 new Field("topic_metadata",
-                                                                           new ArrayOf(TOPIC_METADATA_V0)));
-
-    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
-
-    /* Produce api */
-
-    public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
-                                                                  new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
-                                                                                                     new Field("record_set", BYTES)))));
-
-    public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
-                                                                   INT16,
-                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
-                                                               new Field("timeout", INT32, "The time to await a response in ms."),
-                                                               new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
-    public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                           new Field("partition_responses",
-                                                                                                     new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                      INT32),
-                                                                                                                            new Field("error_code",
-                                                                                                                                      INT16),
-                                                                                                                            new Field("base_offset",
-                                                                                                                                      INT64))))))));
-
-    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
-    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
-
-    /* Offset commit api */
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("metadata",
-                                                                                         STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("timestamp",
-                                                                                         INT64,
-                                                                                         "Timestamp of the commit"),
-                                                                               new Field("metadata",
-                                                                                         STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Message offset to be committed."),
-                                                                               new Field("metadata",
-                                                                                         STRING,
-                                                                                         "Any associated metadata the client wants to keep."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
-                                                                                     STRING,
-                                                                                     "Topic to commit."),
-                                                                           new Field("partitions",
-                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
-                                                                                     "Partitions to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The consumer group id."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
-                                                                               "Topics to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The consumer group id."),
-                                                                     new Field("group_generation_id",
-                                                                               INT32,
-                                                                               "The generation of the consumer group."),
-                                                                     new Field("consumer_id",
-                                                                               STRING,
-                                                                               "The consumer id assigned by the group coordinator."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
-                                                                               "Topics to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
-                                                                               STRING,
-                                                                               "The consumer group id."),
-                                                                     new Field("group_generation_id",
-                                                                               INT32,
-                                                                               "The generation of the consumer group."),
-                                                                     new Field("consumer_id",
-                                                                               STRING,
-                                                                               "The consumer id assigned by the group coordinator."),
-                                                                     new Field("retention_time",
-                                                                               INT64,
-                                                                               "Time period in ms to retain the offset."),
-                                                                     new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
-                                                                               "Topics to commit offsets."));
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                          INT32,
-                                                                                          "Topic partition id."),
-                                                                                new Field("error_code",
-                                                                                          INT16));
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                            new Field("partition_responses",
-                                                                                      new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
-    public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
-
-    /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
-    public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
-    public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
-
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
-
-    /* Offset fetch api */
-
-    /*
-     * Wire formats of version 0 and 1 are the same, but with different functionality.
-     * Version 0 will read the offsets from ZK;
-     * Version 1 will read the offsets from Kafka.
-     */
-    public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                        INT32,
-                                                                                        "Topic partition id."));
-
-    public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                    STRING,
-                                                                                    "Topic to fetch offset."),
-                                                                          new Field("partitions",
-                                                                                    new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
-                                                                                    "Partitions to fetch offsets."));
-
-    public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                              STRING,
-                                                                              "The consumer group id."),
-                                                                    new Field("topics",
-                                                                              new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
-                                                                              "Topics to fetch offsets."));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                         INT32,
-                                                                                         "Topic partition id."),
-                                                                               new Field("offset",
-                                                                                         INT64,
-                                                                                         "Last committed message offset."),
-                                                                               new Field("metadata",
-                                                                                         STRING,
-                                                                                         "Any associated metadata the client wants to keep."),
-                                                                               new Field("error_code", INT16));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                           new Field("partition_responses",
-                                                                                     new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
-
-    public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
-
-    public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
-    public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
-
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1};
-
-    /* List offset api */
-    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                       INT32,
-                                                                                       "Topic partition id."),
-                                                                             new Field("timestamp", INT64, "Timestamp."),
-                                                                             new Field("max_num_offsets",
-                                                                                       INT32,
-                                                                                       "Maximum offsets to return."));
-
-    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                   STRING,
-                                                                                   "Topic to list offset."),
-                                                                         new Field("partitions",
-                                                                                   new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
-                                                                                   "Partitions to list offset."));
-
-    public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
-                                                                             INT32,
-                                                                             "Broker id of the follower. For normal consumers, use -1."),
-                                                                   new Field("topics",
-                                                                             new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
-                                                                             "Topics to list offsets."));
-
-    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                        INT32,
-                                                                                        "Topic partition id."),
-                                                                              new Field("error_code", INT16),
-                                                                              new Field("offsets",
-                                                                                        new ArrayOf(INT64),
-                                                                                        "A list of offsets."));
-
-    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                          new Field("partition_responses",
-                                                                                    new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
-    public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
-    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
-
-    /* Fetch api */
-    public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                 INT32,
-                                                                                 "Topic partition id."),
-                                                                       new Field("fetch_offset",
-                                                                                 INT64,
-                                                                                 "Message offset."),
-                                                                       new Field("max_bytes",
-                                                                                 INT32,
-                                                                                 "Maximum bytes to fetch."));
-
-    public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
-                                                                   new Field("partitions",
-                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V0),
-                                                                             "Partitions to fetch."));
-
-    public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
-                                                                       INT32,
-                                                                       "Broker id of the follower. For normal consumers, use -1."),
-                                                             new Field("max_wait_time",
-                                                                       INT32,
-                                                                       "Maximum time in ms to wait for the response."),
-                                                             new Field("min_bytes",
-                                                                       INT32,
-                                                                       "Minimum bytes to accumulate in the response."),
-                                                             new Field("topics",
-                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
-                                                                       "Topics to fetch."));
-
-    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                  INT32,
-                                                                                  "Topic partition id."),
-                                                                        new Field("error_code", INT16),
-                                                                        new Field("high_watermark",
-                                                                                  INT64,
-                                                                                  "Last committed offset."),
-                                                                        new Field("record_set", BYTES));
-
-    public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                    new Field("partition_responses",
-                                                                              new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
-
-    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                        new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
-
-    /* Consumer metadata api */
-    public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                   STRING,
-                                                                                   "The consumer group id."));
-
-    public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                          new Field("coordinator",
-                                                                                    BROKER,
-                                                                                    "Host and port information for the coordinator for a consumer group."));
-
-    public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
-    public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
-
-    /* Join group api */
-    public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                            STRING,
-                                                                            "The consumer group id."),
-                                                                  new Field("session_timeout",
-                                                                            INT32,
-                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
-                                                                  new Field("topics",
-                                                                            new ArrayOf(STRING),
-                                                                            "An array of topics to subscribe to."),
-                                                                  new Field("consumer_id",
-                                                                            STRING,
-                                                                            "The assigned consumer id or an empty string for a new consumer."),
-                                                                  new Field("partition_assignment_strategy",
-                                                                            STRING,
-                                                                            "The strategy for the coordinator to assign partitions."));
-
-    public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                         new Field("partitions", new ArrayOf(INT32)));
-    public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                   new Field("group_generation_id",
-                                                                             INT32,
-                                                                             "The generation of the consumer group."),
-                                                                   new Field("consumer_id",
-                                                                             STRING,
-                                                                             "The consumer id assigned by the group coordinator."),
-                                                                   new Field("assigned_partitions",
-                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
-
-    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
-    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
-
-    /* Heartbeat api */
-    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
-                                                                 new Field("group_generation_id",
-                                                                           INT32,
-                                                                           "The generation of the consumer group."),
-                                                                 new Field("consumer_id",
-                                                                           STRING,
-                                                                           "The consumer id assigned by the group coordinator."));
-
-    public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
-
-    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
-    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
-
-    /* an array of all requests and responses with all schema versions */
-    public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
-    public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
-
-    /* the latest version of each api */
-    public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
-
-    static {
-        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
-        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
-        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
-        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
-        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
-        REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
-        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
-        REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
-        REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
-        REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
-
-        RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
-        RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
-        RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
-        RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
-        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
-        RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
-        RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
-        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
-        RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
-        RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
-        RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
-
-        /* set the maximum version of each api */
-        for (ApiKeys api : ApiKeys.values())
-            CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
-
-        /* sanity check that we have the same number of request and response versions for each api */
-        for (ApiKeys api : ApiKeys.values())
-            if (REQUESTS[api.id].length != RESPONSES[api.id].length)
-                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
-                        + " but " + RESPONSES[api.id].length + " response versions.");
-    }
-
-}