You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:20 UTC

[03/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
deleted file mode 100644
index 94f55a6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import org.apache.kafka.copied.common.KafkaException;
-import org.apache.kafka.copied.common.utils.AbstractIterator;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-
-/**
- * A {@link Records} implementation backed by a ByteBuffer.
- */
-public class MemoryRecords implements Records {
-
-    private final Compressor compressor;
-    private final int capacity;
-    private final int sizeLimit;
-    private ByteBuffer buffer;
-    private boolean writable;
-
-    // Construct a writable memory records
-    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
-        this.writable = writable;
-        this.capacity = buffer.capacity();
-        this.sizeLimit = sizeLimit;
-        if (this.writable) {
-            this.buffer = null;
-            this.compressor = new Compressor(buffer, type);
-        } else {
-            this.buffer = buffer;
-            this.compressor = null;
-        }
-    }
-
-    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
-        return new MemoryRecords(buffer, type, true, capacity);
-    }
-
-    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
-        return emptyRecords(buffer, type, buffer.capacity());
-    }
-
-    public static MemoryRecords readableRecords(ByteBuffer buffer) {
-        return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
-    }
-
-    /**
-     * Append the given record and offset to the buffer
-     */
-    public void append(long offset, Record record) {
-        if (!writable)
-            throw new IllegalStateException("Memory records is not writable");
-
-        int size = record.size();
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        compressor.put(record.buffer());
-        compressor.recordWritten(size + LOG_OVERHEAD);
-        record.buffer().rewind();
-    }
-
-    /**
-     * Append a new record and offset to the buffer
-     */
-    public void append(long offset, byte[] key, byte[] value) {
-        if (!writable)
-            throw new IllegalStateException("Memory records is not writable");
-
-        int size = Record.recordSize(key, value);
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        compressor.putRecord(key, value);
-        compressor.recordWritten(size + LOG_OVERHEAD);
-    }
-
-    /**
-     * Check if we have room for a new record containing the given key/value pair
-     * 
-     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
-     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
-     * re-allocation in the underlying byte buffer stream.
-     * 
-     * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
-     * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
-     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
-     * has exceed this limit we also mark this record as full.
-     */
-    public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + LOG_OVERHEAD +
-                                                 Record.recordSize(key, value) &&
-               this.sizeLimit >= this.compressor.estimatedBytesWritten();
-    }
-
-    public boolean isFull() {
-        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
-               this.sizeLimit <= this.compressor.estimatedBytesWritten();
-    }
-
-    /**
-     * Close this batch for no more appends
-     */
-    public void close() {
-        if (writable) {
-            compressor.close();
-            writable = false;
-            buffer = compressor.buffer();
-        }
-    }
-
-    /** Write the records in this set to the given channel */
-    public int writeTo(GatheringByteChannel channel) throws IOException {
-        return channel.write(buffer);
-    }
-
-    /**
-     * The size of this record set
-     */
-    public int sizeInBytes() {
-        return compressor.buffer().position();
-    }
-
-    /**
-     * The compression rate of this record set
-     */
-    public double compressionRate() {
-        if (compressor == null)
-            return 1.0;
-        else
-            return compressor.compressionRate();
-    }
-
-    /**
-     * Return the capacity of the buffer
-     */
-    public int capacity() {
-        return this.capacity;
-    }
-
-    /**
-     * Get the byte buffer that backs this records instance
-     */
-    public ByteBuffer buffer() {
-        return buffer.duplicate();
-    }
-
-    /**
-     * Return a flipped duplicate of the closed buffer to reading records
-     */
-    public ByteBuffer flip() {
-        if (writable)
-            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
-
-        return (ByteBuffer) buffer.flip();
-    }
-
-    @Override
-    public Iterator<LogEntry> iterator() {
-        ByteBuffer copy = this.buffer.duplicate();
-        return new RecordsIterator(copy, CompressionType.NONE, false);
-    }
-    
-    @Override
-    public String toString() {
-        Iterator<LogEntry> iter = iterator();
-        StringBuilder builder = new StringBuilder();
-        builder.append('[');
-        while (iter.hasNext()) {
-            LogEntry entry = iter.next();
-            builder.append('(');
-            builder.append("offset=");
-            builder.append(entry.offset());
-            builder.append(",");
-            builder.append("record=");
-            builder.append(entry.record());
-            builder.append(")");
-        }
-        builder.append(']');
-        return builder.toString();
-    }
-
-    public static class RecordsIterator extends AbstractIterator<LogEntry> {
-        private final ByteBuffer buffer;
-        private final DataInputStream stream;
-        private final CompressionType type;
-        private final boolean shallow;
-        private RecordsIterator innerIter;
-
-        public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
-            this.type = type;
-            this.buffer = buffer;
-            this.shallow = shallow;
-            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
-        }
-
-        /*
-         * Read the next record from the buffer.
-         * 
-         * Note that in the compressed message set, each message value size is set as the size of the un-compressed
-         * version of the message value, so when we do de-compression allocating an array of the specified size for
-         * reading compressed value data is sufficient.
-         */
-        @Override
-        protected LogEntry makeNext() {
-            if (innerDone()) {
-                try {
-                    // read the offset
-                    long offset = stream.readLong();
-                    // read record size
-                    int size = stream.readInt();
-                    if (size < 0)
-                        throw new IllegalStateException("Record with size " + size);
-                    // read the record, if compression is used we cannot depend on size
-                    // and hence has to do extra copy
-                    ByteBuffer rec;
-                    if (type == CompressionType.NONE) {
-                        rec = buffer.slice();
-                        int newPos = buffer.position() + size;
-                        if (newPos > buffer.limit())
-                            return allDone();
-                        buffer.position(newPos);
-                        rec.limit(size);
-                    } else {
-                        byte[] recordBuffer = new byte[size];
-                        stream.readFully(recordBuffer, 0, size);
-                        rec = ByteBuffer.wrap(recordBuffer);
-                    }
-                    LogEntry entry = new LogEntry(offset, new Record(rec));
-
-                    // decide whether to go shallow or deep iteration if it is compressed
-                    CompressionType compression = entry.record().compressionType();
-                    if (compression == CompressionType.NONE || shallow) {
-                        return entry;
-                    } else {
-                        // init the inner iterator with the value payload of the message,
-                        // which will de-compress the payload to a set of messages;
-                        // since we assume nested compression is not allowed, the deep iterator
-                        // would not try to further decompress underlying messages
-                        ByteBuffer value = entry.record().value();
-                        innerIter = new RecordsIterator(value, compression, true);
-                        return innerIter.next();
-                    }
-                } catch (EOFException e) {
-                    return allDone();
-                } catch (IOException e) {
-                    throw new KafkaException(e);
-                }
-            } else {
-                return innerIter.next();
-            }
-        }
-
-        private boolean innerDone() {
-            return innerIter == null || !innerIter.hasNext();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
deleted file mode 100644
index 54b4084..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import org.apache.kafka.copied.common.utils.Crc32;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.nio.ByteBuffer;
-
-
-/**
- * A record: a serialized key and value along with the associated CRC and other fields
- */
-public final class Record {
-
-    /**
-     * The current offset and size for all the fixed-length fields
-     */
-    public static final int CRC_OFFSET = 0;
-    public static final int CRC_LENGTH = 4;
-    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
-    public static final int MAGIC_LENGTH = 1;
-    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
-    public static final int ATTRIBUTE_LENGTH = 1;
-    public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
-    public static final int KEY_SIZE_LENGTH = 4;
-    public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
-    public static final int VALUE_SIZE_LENGTH = 4;
-
-    /**
-     * The size for the record header
-     */
-    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
-
-    /**
-     * The amount of overhead bytes in a record
-     */
-    public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
-
-    /**
-     * The current "magic" value
-     */
-    public static final byte CURRENT_MAGIC_VALUE = 0;
-
-    /**
-     * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
-     * compression
-     */
-    public static final int COMPRESSION_CODEC_MASK = 0x07;
-
-    /**
-     * Compression code for uncompressed records
-     */
-    public static final int NO_COMPRESSION = 0;
-
-    private final ByteBuffer buffer;
-
-    public Record(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    /**
-     * A constructor to create a LogRecord. If the record's compression type is not none, then
-     * its value payload should be already compressed with the specified type; the constructor
-     * would always write the value payload as is and will not do the compression itself.
-     * 
-     * @param key The key of the record (null, if none)
-     * @param value The record value
-     * @param type The compression type used on the contents of the record (if any)
-     * @param valueOffset The offset into the payload array used to extract payload
-     * @param valueSize The size of the payload to use
-     */
-    public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
-                value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
-        write(this.buffer, key, value, type, valueOffset, valueSize);
-        this.buffer.rewind();
-    }
-
-    public Record(byte[] key, byte[] value, CompressionType type) {
-        this(key, value, type, 0, -1);
-    }
-
-    public Record(byte[] value, CompressionType type) {
-        this(null, value, type);
-    }
-
-    public Record(byte[] key, byte[] value) {
-        this(key, value, CompressionType.NONE);
-    }
-
-    public Record(byte[] value) {
-        this(null, value, CompressionType.NONE);
-    }
-
-    // Write a record to the buffer, if the record's compression type is none, then
-    // its value payload should be already compressed with the specified type
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        // construct the compressor with compression type none since this function will not do any
-        //compression according to the input type, it will just write the record's payload as is
-        Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
-        compressor.putRecord(key, value, type, valueOffset, valueSize);
-    }
-
-    public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
-        // write crc
-        compressor.putInt((int) (crc & 0xffffffffL));
-        // write magic value
-        compressor.putByte(CURRENT_MAGIC_VALUE);
-        // write attributes
-        compressor.putByte(attributes);
-        // write the key
-        if (key == null) {
-            compressor.putInt(-1);
-        } else {
-            compressor.putInt(key.length);
-            compressor.put(key, 0, key.length);
-        }
-        // write the value
-        if (value == null) {
-            compressor.putInt(-1);
-        } else {
-            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            compressor.putInt(size);
-            compressor.put(value, valueOffset, size);
-        }
-    }
-
-    public static int recordSize(byte[] key, byte[] value) {
-        return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
-    }
-
-    public static int recordSize(int keySize, int valueSize) {
-        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
-    }
-
-    public ByteBuffer buffer() {
-        return this.buffer;
-    }
-
-    public static byte computeAttributes(CompressionType type) {
-        byte attributes = 0;
-        if (type.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
-        return attributes;
-    }
-
-    /**
-     * Compute the checksum of the record from the record contents
-     */
-    public static long computeChecksum(ByteBuffer buffer, int position, int size) {
-        Crc32 crc = new Crc32();
-        crc.update(buffer.array(), buffer.arrayOffset() + position, size);
-        return crc.getValue();
-    }
-
-    /**
-     * Compute the checksum of the record from the attributes, key and value payloads
-     */
-    public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        Crc32 crc = new Crc32();
-        crc.update(CURRENT_MAGIC_VALUE);
-        byte attributes = 0;
-        if (type.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
-        crc.update(attributes);
-        // update for the key
-        if (key == null) {
-            crc.updateInt(-1);
-        } else {
-            crc.updateInt(key.length);
-            crc.update(key, 0, key.length);
-        }
-        // update for the value
-        if (value == null) {
-            crc.updateInt(-1);
-        } else {
-            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            crc.updateInt(size);
-            crc.update(value, valueOffset, size);
-        }
-        return crc.getValue();
-    }
-
-
-    /**
-     * Compute the checksum of the record from the record contents
-     */
-    public long computeChecksum() {
-        return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
-    }
-
-    /**
-     * Retrieve the previously computed CRC for this record
-     */
-    public long checksum() {
-        return Utils.readUnsignedInt(buffer, CRC_OFFSET);
-    }
-
-    /**
-     * Returns true if the crc stored with the record matches the crc computed off the record contents
-     */
-    public boolean isValid() {
-        return checksum() == computeChecksum();
-    }
-
-    /**
-     * Throw an InvalidRecordException if isValid is false for this record
-     */
-    public void ensureValid() {
-        if (!isValid())
-            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
-                                             + ", computed crc = "
-                                             + computeChecksum()
-                                             + ")");
-    }
-
-    /**
-     * The complete serialized size of this record in bytes (including crc, header attributes, etc)
-     */
-    public int size() {
-        return buffer.limit();
-    }
-
-    /**
-     * The length of the key in bytes
-     */
-    public int keySize() {
-        return buffer.getInt(KEY_SIZE_OFFSET);
-    }
-
-    /**
-     * Does the record have a key?
-     */
-    public boolean hasKey() {
-        return keySize() >= 0;
-    }
-
-    /**
-     * The position where the value size is stored
-     */
-    private int valueSizeOffset() {
-        return KEY_OFFSET + Math.max(0, keySize());
-    }
-
-    /**
-     * The length of the value in bytes
-     */
-    public int valueSize() {
-        return buffer.getInt(valueSizeOffset());
-    }
-
-    /**
-     * The magic version of this record
-     */
-    public byte magic() {
-        return buffer.get(MAGIC_OFFSET);
-    }
-
-    /**
-     * The attributes stored with this record
-     */
-    public byte attributes() {
-        return buffer.get(ATTRIBUTES_OFFSET);
-    }
-
-    /**
-     * The compression type used with this record
-     */
-    public CompressionType compressionType() {
-        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
-    }
-
-    /**
-     * A ByteBuffer containing the value of this record
-     */
-    public ByteBuffer value() {
-        return sliceDelimited(valueSizeOffset());
-    }
-
-    /**
-     * A ByteBuffer containing the message key
-     */
-    public ByteBuffer key() {
-        return sliceDelimited(KEY_SIZE_OFFSET);
-    }
-
-    /**
-     * Read a size-delimited byte buffer starting at the given offset
-     */
-    private ByteBuffer sliceDelimited(int start) {
-        int size = buffer.getInt(start);
-        if (size < 0) {
-            return null;
-        } else {
-            ByteBuffer b = buffer.duplicate();
-            b.position(start + 4);
-            b = b.slice();
-            b.limit(size);
-            b.rewind();
-            return b;
-        }
-    }
-
-    public String toString() {
-        return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
-                magic(),
-                attributes(),
-                compressionType(),
-                checksum(),
-                key() == null ? 0 : key().limit(),
-                value() == null ? 0 : value().limit());
-    }
-
-    public boolean equals(Object other) {
-        if (this == other)
-            return true;
-        if (other == null)
-            return false;
-        if (!other.getClass().equals(Record.class))
-            return false;
-        Record record = (Record) other;
-        return this.buffer.equals(record.buffer);
-    }
-
-    public int hashCode() {
-        return buffer.hashCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
deleted file mode 100644
index 06f6668..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
- */
-public interface Records extends Iterable<LogEntry> {
-
-    int SIZE_LENGTH = 4;
-    int OFFSET_LENGTH = 8;
-    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
-
-    /**
-     * Write these records to the given channel
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails.
-     */
-    public int writeTo(GatheringByteChannel channel) throws IOException;
-
-    /**
-     * The size of these records in bytes
-     */
-    public int sizeInBytes();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
deleted file mode 100644
index 53972c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public abstract class AbstractRequest extends AbstractRequestResponse {
-
-    public AbstractRequest(Struct struct) {
-        super(struct);
-    }
-
-    /**
-     * Get an error response for a request for a given api version
-     */
-    public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
-
-    /**
-     * Factory method for getting a request object based on ApiKey ID and a buffer
-     */
-    public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
-        switch (ApiKeys.forId(requestId)) {
-            case PRODUCE:
-                return ProduceRequest.parse(buffer, versionId);
-            case FETCH:
-                return FetchRequest.parse(buffer, versionId);
-            case LIST_OFFSETS:
-                return ListOffsetRequest.parse(buffer, versionId);
-            case METADATA:
-                return MetadataRequest.parse(buffer, versionId);
-            case OFFSET_COMMIT:
-                return OffsetCommitRequest.parse(buffer, versionId);
-            case OFFSET_FETCH:
-                return OffsetFetchRequest.parse(buffer, versionId);
-            case CONSUMER_METADATA:
-                return ConsumerMetadataRequest.parse(buffer, versionId);
-            case JOIN_GROUP:
-                return JoinGroupRequest.parse(buffer, versionId);
-            case HEARTBEAT:
-                return HeartbeatRequest.parse(buffer, versionId);
-            default:
-                return null;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
deleted file mode 100644
index b070fec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public abstract class AbstractRequestResponse {
-    protected final Struct struct;
-
-
-    public AbstractRequestResponse(Struct struct) {
-        this.struct = struct;
-    }
-
-    public Struct toStruct() {
-        return struct;
-    }
-
-    /**
-     * Get the serialized size of this object
-     */
-    public int sizeOf() {
-        return struct.sizeOf();
-    }
-
-    /**
-     * Write this object to a buffer
-     */
-    public void writeTo(ByteBuffer buffer) {
-        struct.writeTo(buffer);
-    }
-
-    @Override
-    public String toString() {
-        return struct.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        return struct.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        AbstractRequestResponse other = (AbstractRequestResponse) obj;
-        return struct.equals(other.struct);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
deleted file mode 100644
index c5d65b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    private final String groupId;
-
-    public ConsumerMetadataRequest(String groupId) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        this.groupId = groupId;
-    }
-
-    public ConsumerMetadataRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
-    }
-
-    public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
-        return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
deleted file mode 100644
index 68fd4cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final short errorCode;
-    private final Node node;
-
-    public ConsumerMetadataResponse(short errorCode, Node node) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        this.errorCode = errorCode;
-        this.node = node;
-    }
-
-    public ConsumerMetadataResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
-        return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
deleted file mode 100644
index 78069ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FetchRequest extends AbstractRequest {
-    
-    public static final int CONSUMER_REPLICA_ID = -1;
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
-    private static final String REPLICA_ID_KEY_NAME = "replica_id";
-    private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
-    private static final String MIN_BYTES_KEY_NAME = "min_bytes";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
-    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
-
-    private final int replicaId;
-    private final int maxWait;
-    private final int minBytes;
-    private final Map<TopicPartition, PartitionData> fetchData;
-
-    public static final class PartitionData {
-        public final long offset;
-        public final int maxBytes;
-
-        public PartitionData(long offset, int maxBytes) {
-            this.offset = offset;
-            this.maxBytes = maxBytes;
-        }
-    }
-
-    /**
-     * Create a non-replica fetch request
-     */
-    public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
-    }
-
-    /**
-     * Create a replica fetch request
-     */
-    public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        struct.set(MAX_WAIT_KEY_NAME, maxWait);
-        struct.set(MIN_BYTES_KEY_NAME, minBytes);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        this.replicaId = replicaId;
-        this.maxWait = maxWait;
-        this.minBytes = minBytes;
-        this.fetchData = fetchData;
-    }
-
-    public FetchRequest(Struct struct) {
-        super(struct);
-        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
-        maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
-        minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
-        fetchData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
-                int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, maxBytes);
-                fetchData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
-
-        for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
-            FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
-                    FetchResponse.INVALID_HIGHWATERMARK,
-                    FetchResponse.EMPTY_RECORD_SET);
-            responseData.put(entry.getKey(), partitionResponse);
-        }
-
-        switch (versionId) {
-            case 0:
-                return new FetchResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
-        }
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
-
-    public int maxWait() {
-        return maxWait;
-    }
-
-    public int minBytes() {
-        return minBytes;
-    }
-
-    public Map<TopicPartition, PartitionData> fetchData() {
-        return fetchData;
-    }
-
-    public static FetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
-    }
-
-    public static FetchRequest parse(ByteBuffer buffer) {
-        return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
deleted file mode 100644
index fc2c44e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FetchResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     *  OFFSET_OUT_OF_RANGE (1)
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
-     *  NOT_LEADER_FOR_PARTITION (6)
-     *  REPLICA_NOT_AVAILABLE (9)
-     *  UNKNOWN (-1)
-     */
-
-    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
-    private static final String RECORD_SET_KEY_NAME = "record_set";
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
-
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    public static final class PartitionData {
-        public final short errorCode;
-        public final long highWatermark;
-        public final ByteBuffer recordSet;
-
-        public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
-            this.errorCode = errorCode;
-            this.highWatermark = highWatermark;
-            this.recordSet = recordSet;
-        }
-    }
-
-    public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
-                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
-                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public FetchResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
-                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
-    }
-
-    public static FetchResponse parse(ByteBuffer buffer) {
-        return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
deleted file mode 100644
index 7cb7699..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class HeartbeatRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-
-    private final String groupId;
-    private final int groupGenerationId;
-    private final String consumerId;
-
-    public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        this.groupId = groupId;
-        this.groupGenerationId = groupGenerationId;
-        this.consumerId = consumerId;
-    }
-
-    public HeartbeatRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new HeartbeatResponse(Errors.forException(e).code());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int groupGenerationId() {
-        return groupGenerationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
-        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
-    }
-
-    public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
deleted file mode 100644
index 12c5ac8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class HeartbeatResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
-     */
-
-    private final short errorCode;
-    public HeartbeatResponse(short errorCode) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        this.errorCode = errorCode;
-    }
-
-    public HeartbeatResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public static HeartbeatResponse parse(ByteBuffer buffer) {
-        return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
deleted file mode 100644
index 011bb9d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class JoinGroupRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
-
-    public static final String UNKNOWN_CONSUMER_ID = "";
-
-    private final String groupId;
-    private final int sessionTimeout;
-    private final List<String> topics;
-    private final String consumerId;
-    private final String strategy;
-
-    public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        struct.set(STRATEGY_KEY_NAME, strategy);
-        this.groupId = groupId;
-        this.sessionTimeout = sessionTimeout;
-        this.topics = topics;
-        this.consumerId = consumerId;
-        this.strategy = strategy;
-    }
-
-    public JoinGroupRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
-        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
-        for (Object topic: topicsArray)
-            topics.add((String) topic);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-        strategy = struct.getString(STRATEGY_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new JoinGroupResponse(
-                        Errors.forException(e).code(),
-                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
-                        Collections.<TopicPartition>emptyList());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int sessionTimeout() {
-        return sessionTimeout;
-    }
-
-    public List<String> topics() {
-        return topics;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public String strategy() {
-        return strategy;
-    }
-
-    public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
-    }
-
-    public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
deleted file mode 100644
index 5855e94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class JoinGroupResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
-     * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
-     * UNKNOWN_CONSUMER_ID (25)
-     * INVALID_SESSION_TIMEOUT (26)
-     */
-
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    public static final int UNKNOWN_GENERATION_ID = -1;
-    public static final String UNKNOWN_CONSUMER_ID = "";
-
-    private final short errorCode;
-    private final int generationId;
-    private final String consumerId;
-    private final List<TopicPartition> assignedPartitions;
-
-    public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
-
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
-            Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
-
-        this.errorCode = errorCode;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.assignedPartitions = assignedPartitions;
-    }
-
-    public JoinGroupResponse(Struct struct) {
-        super(struct);
-        assignedPartitions = new ArrayList<TopicPartition>();
-        for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
-                assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
-        }
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public List<TopicPartition> assignedPartitions() {
-        return assignedPartitions;
-    }
-
-    public static JoinGroupResponse parse(ByteBuffer buffer) {
-        return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
deleted file mode 100644
index 299de92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ListOffsetRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
-    private static final String REPLICA_ID_KEY_NAME = "replica_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String TIMESTAMP_KEY_NAME = "timestamp";
-    private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
-
-    private final int replicaId;
-    private final Map<TopicPartition, PartitionData> offsetData;
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets;
-
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-        }
-    }
-    
-    public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
-        this(-1, offsetData);
-    }
-
-    public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
-                partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        this.replicaId = replicaId;
-        this.offsetData = offsetData;
-    }
-
-    public ListOffsetRequest(Struct struct) {
-        super(struct);
-        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
-        offsetData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
-                int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
-                PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
-                offsetData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
-
-        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
-            ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
-            responseData.put(entry.getKey(), partitionResponse);
-        }
-
-        switch (versionId) {
-            case 0:
-                return new ListOffsetResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
-        }
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
-
-    public Map<TopicPartition, PartitionData> offsetData() {
-        return offsetData;
-    }
-
-    public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
-        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
-    }
-
-    public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
deleted file mode 100644
index d7de2df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ListOffsetResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
-     *  NOT_LEADER_FOR_PARTITION (6)
-     *  UNKNOWN (-1)
-     */
-
-    private static final String OFFSETS_KEY_NAME = "offsets";
-
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    public static final class PartitionData {
-        public final short errorCode;
-        public final List<Long> offsets;
-
-        public PartitionData(short errorCode, List<Long> offsets) {
-            this.errorCode = errorCode;
-            this.offsets = offsets;
-        }
-    }
-
-    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
-                partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public ListOffsetResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
-                List<Long> offsetsList = new ArrayList<Long>();
-                for (Object offset: offsets)
-                    offsetsList.add((Long) offset);
-                PartitionData partitionData = new PartitionData(errorCode, offsetsList);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
-    }
-
-    public static ListOffsetResponse parse(ByteBuffer buffer) {
-        return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}