You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:32 UTC

[15/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Compressor.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Compressor.java
new file mode 100644
index 0000000..b1b537c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Compressor.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class Compressor {
+
+    static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+    static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+
+    private static final float[] TYPE_TO_RATE;
+
+    static {
+        int maxTypeId = -1;
+        for (CompressionType type : CompressionType.values())
+            maxTypeId = Math.max(maxTypeId, type.id);
+        TYPE_TO_RATE = new float[maxTypeId + 1];
+        for (CompressionType type : CompressionType.values()) {
+            TYPE_TO_RATE[type.id] = type.rate;
+        }
+    }
+
+    private final CompressionType type;
+    private final DataOutputStream appendStream;
+    private final ByteBufferOutputStream bufferStream;
+    private final int initPos;
+
+    public long writtenUncompressed;
+    public long numRecords;
+
+    public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
+        this.type = type;
+        this.initPos = buffer.position();
+
+        this.numRecords = 0;
+        this.writtenUncompressed = 0;
+
+        if (type != CompressionType.NONE) {
+            // for compressed records, leave space for the header and the shallow message metadata
+            // and move the starting position to the value payload offset
+            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
+        }
+
+        // create the stream
+        bufferStream = new ByteBufferOutputStream(buffer);
+        appendStream = wrapForOutput(bufferStream, type, blockSize);
+    }
+
+    public Compressor(ByteBuffer buffer, CompressionType type) {
+        this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
+    }
+
+    public ByteBuffer buffer() {
+        return bufferStream.buffer();
+    }
+    
+    public double compressionRate() {
+        ByteBuffer buffer = bufferStream.buffer();
+        if (this.writtenUncompressed == 0)
+            return 1.0;
+        else
+            return (double) buffer.position() / this.writtenUncompressed;
+    }
+
+    public void close() {
+        try {
+            appendStream.close();
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        if (type != CompressionType.NONE) {
+            ByteBuffer buffer = bufferStream.buffer();
+            int pos = buffer.position();
+            // write the header, for the end offset write as number of records - 1
+            buffer.position(initPos);
+            buffer.putLong(numRecords - 1);
+            buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
+            // write the shallow message (the crc and value size are not correct yet)
+            Record.write(buffer, null, null, type, 0, -1);
+            // compute the fill the value size
+            int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
+            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize);
+            // compute and fill the crc at the beginning of the message
+            long crc = Record.computeChecksum(buffer,
+                    initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
+                    pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
+            Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+            // reset the position
+            buffer.position(pos);
+
+            // update the compression ratio
+            float compressionRate = (float) buffer.position() / this.writtenUncompressed;
+            TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+                compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+        }
+    }
+
+    // Note that for all the write operations below, IO exceptions should
+    // never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
+    // therefore upon encountering this issue we just close the append stream.
+
+    public void putLong(final long value) {
+        try {
+            appendStream.writeLong(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putInt(final int value) {
+        try {
+            appendStream.writeInt(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void put(final ByteBuffer buffer) {
+        try {
+            appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putByte(final byte value) {
+        try {
+            appendStream.write(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void put(final byte[] bytes, final int offset, final int len) {
+        try {
+            appendStream.write(bytes, offset, len);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        // put a record as un-compressed into the underlying stream
+        long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize);
+        byte attributes = Record.computeAttributes(type);
+        putRecord(crc, attributes, key, value, valueOffset, valueSize);
+    }
+
+    public void putRecord(byte[] key, byte[] value) {
+        putRecord(key, value, CompressionType.NONE, 0, -1);
+    }
+
+    private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
+        Record.write(this, crc, attributes, key, value, valueOffset, valueSize);
+    }
+
+    public void recordWritten(int size) {
+        numRecords += 1;
+        writtenUncompressed += size;
+    }
+
+    public long estimatedBytesWritten() {
+        if (type == CompressionType.NONE) {
+            return bufferStream.buffer().position();
+        } else {
+            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
+            return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+        }
+    }
+
+    // the following two functions also need to be public since they are used in MemoryRecords.iteration
+
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
+        try {
+            switch (type) {
+                case NONE:
+                    return new DataOutputStream(buffer);
+                case GZIP:
+                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                case SNAPPY:
+                    // dynamically load the snappy class to avoid runtime dependency
+                    // on snappy if we are not using it
+                    try {
+                        Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
+                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
+                            .newInstance(buffer, bufferSize);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                case LZ4:
+                    try {
+                        Class<?> outputStreamClass = Class.forName("org.apache.KafkaLZ4BlockOutputStream");
+                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
+                            .newInstance(buffer);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
+        try {
+            switch (type) {
+                case NONE:
+                    return new DataInputStream(buffer);
+                case GZIP:
+                    return new DataInputStream(new GZIPInputStream(buffer));
+                case SNAPPY:
+                    // dynamically load the snappy class to avoid runtime dependency
+                    // on snappy if we are not using it
+                    try {
+                        Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
+                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
+                            .newInstance(buffer);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                case LZ4:
+                    // dynamically load LZ4 class to avoid runtime dependency
+                    try {
+                        Class<?> inputStreamClass = Class.forName("org.apache.KafkaLZ4BlockInputStream");
+                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
+                            .newInstance(buffer);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/InvalidRecordException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/InvalidRecordException.java
new file mode 100644
index 0000000..38c26ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/InvalidRecordException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class InvalidRecordException extends RuntimeException {
+
+    private static final long serialVersionUID = 1;
+
+    public InvalidRecordException(String s) {
+        super(s);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockInputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..1049787
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.record;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ *      Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+    public static final String PREMATURE_EOS = "Stream ended prematurely";
+    public static final String NOT_SUPPORTED = "Stream unsupported";
+    public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+    public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+    private final LZ4SafeDecompressor decompressor;
+    private final XXHash32 checksum;
+    private final byte[] buffer;
+    private final byte[] compressedBuffer;
+    private final int maxBlockSize;
+    private KafkaLZ4BlockOutputStream.FLG flg;
+    private KafkaLZ4BlockOutputStream.BD bd;
+    private int bufferOffset;
+    private int bufferSize;
+    private boolean finished;
+
+    /**
+     * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+     * 
+     * @param in The stream to decompress
+     * @throws IOException
+     */
+    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+        super(in);
+        decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+        checksum = XXHashFactory.fastestInstance().hash32();
+        readHeader();
+        maxBlockSize = bd.getBlockMaximumSize();
+        buffer = new byte[maxBlockSize];
+        compressedBuffer = new byte[maxBlockSize];
+        bufferOffset = 0;
+        bufferSize = 0;
+        finished = false;
+    }
+
+    /**
+     * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+     * 
+     * @throws IOException
+     */
+    private void readHeader() throws IOException {
+        byte[] header = new byte[KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH];
+
+        // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+        bufferOffset = 6;
+        if (in.read(header, 0, bufferOffset) != bufferOffset) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        if (KafkaLZ4BlockOutputStream.MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+            throw new IOException(NOT_SUPPORTED);
+        }
+        flg = KafkaLZ4BlockOutputStream.FLG.fromByte(header[bufferOffset - 2]);
+        bd = KafkaLZ4BlockOutputStream.BD.fromByte(header[bufferOffset - 1]);
+        // TODO read uncompressed content size, update flg.validate()
+        // TODO read dictionary id, update flg.validate()
+
+        // check stream descriptor hash
+        byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+        header[bufferOffset++] = (byte) in.read();
+        if (hash != header[bufferOffset - 1]) {
+            throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+        }
+    }
+
+    /**
+     * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
+     * result to a buffer.
+     * 
+     * @throws IOException
+     */
+    private void readBlock() throws IOException {
+        int blockSize = Utils.readUnsignedIntLE(in);
+
+        // Check for EndMark
+        if (blockSize == 0) {
+            finished = true;
+            // TODO implement content checksum, update flg.validate()
+            return;
+        } else if (blockSize > maxBlockSize) {
+            throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+        }
+
+        boolean compressed = (blockSize & KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+        byte[] bufferToRead;
+        if (compressed) {
+            bufferToRead = compressedBuffer;
+        } else {
+            blockSize &= ~KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+            bufferToRead = buffer;
+            bufferSize = blockSize;
+        }
+
+        if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        // verify checksum
+        if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+            throw new IOException(BLOCK_HASH_MISMATCH);
+        }
+
+        if (compressed) {
+            try {
+                bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+            } catch (LZ4Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        bufferOffset = 0;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (finished) {
+            return -1;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return -1;
+        }
+        int value = buffer[bufferOffset++] & 0xFF;
+
+        return value;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        net.jpountz.util.Utils.checkRange(b, off, len);
+        if (finished) {
+            return -1;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return -1;
+        }
+        len = Math.min(len, available());
+        System.arraycopy(buffer, bufferOffset, b, off, len);
+        bufferOffset += len;
+        return len;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (finished) {
+            return 0;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return 0;
+        }
+        n = Math.min(n, available());
+        bufferOffset += n;
+        return n;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return bufferSize - bufferOffset;
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        throw new RuntimeException("mark not supported");
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        throw new RuntimeException("reset not supported");
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockOutputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..3007c42
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.record;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ *      Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+    public static final int MAGIC = 0x184D2204;
+    public static final int LZ4_MAX_HEADER_LENGTH = 19;
+    public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+    public static final String CLOSED_STREAM = "The stream is already closed";
+
+    public static final int BLOCKSIZE_64KB = 4;
+    public static final int BLOCKSIZE_256KB = 5;
+    public static final int BLOCKSIZE_1MB = 6;
+    public static final int BLOCKSIZE_4MB = 7;
+
+    private final LZ4Compressor compressor;
+    private final XXHash32 checksum;
+    private final FLG flg;
+    private final BD bd;
+    private final byte[] buffer;
+    private final byte[] compressedBuffer;
+    private final int maxBlockSize;
+    private int bufferOffset;
+    private boolean finished;
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The output stream to compress
+     * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+     *            values will generate an exception
+     * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+     *            every block of data
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+        super(out);
+        compressor = LZ4Factory.fastestInstance().fastCompressor();
+        checksum = XXHashFactory.fastestInstance().hash32();
+        bd = new BD(blockSize);
+        flg = new FLG(blockChecksum);
+        bufferOffset = 0;
+        maxBlockSize = bd.getBlockMaximumSize();
+        buffer = new byte[maxBlockSize];
+        compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+        finished = false;
+        writeHeader();
+    }
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The stream to compress
+     * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+     *            values will generate an exception
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+        this(out, blockSize, false);
+    }
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The output stream to compress
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+        this(out, BLOCKSIZE_64KB);
+    }
+
+    /**
+     * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+     * 
+     * @throws IOException
+     */
+    private void writeHeader() throws IOException {
+        Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+        bufferOffset = 4;
+        buffer[bufferOffset++] = flg.toByte();
+        buffer[bufferOffset++] = bd.toByte();
+        // TODO write uncompressed content size, update flg.validate()
+        // TODO write dictionary id, update flg.validate()
+        // compute checksum on all descriptor fields
+        int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+        buffer[bufferOffset++] = (byte) hash;
+        // write out frame descriptor
+        out.write(buffer, 0, bufferOffset);
+        bufferOffset = 0;
+    }
+
+    /**
+     * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
+     * {@link OutputStream}.
+     * 
+     * @throws IOException
+     */
+    private void writeBlock() throws IOException {
+        if (bufferOffset == 0) {
+            return;
+        }
+
+        int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
+        byte[] bufferToWrite = compressedBuffer;
+        int compressMethod = 0;
+
+        // Store block uncompressed if compressed length is greater (incompressible)
+        if (compressedLength >= bufferOffset) {
+            bufferToWrite = buffer;
+            compressedLength = bufferOffset;
+            compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+        }
+
+        // Write content
+        Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+        out.write(bufferToWrite, 0, compressedLength);
+
+        // Calculate and write block checksum
+        if (flg.isBlockChecksumSet()) {
+            int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+            Utils.writeUnsignedIntLE(out, hash);
+        }
+        bufferOffset = 0;
+    }
+
+    /**
+     * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
+     * of the block stream.
+     * 
+     * @throws IOException
+     */
+    private void writeEndMark() throws IOException {
+        Utils.writeUnsignedIntLE(out, 0);
+        // TODO implement content checksum, update flg.validate()
+        finished = true;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        ensureNotFinished();
+        if (bufferOffset == maxBlockSize) {
+            writeBlock();
+        }
+        buffer[bufferOffset++] = (byte) b;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        net.jpountz.util.Utils.checkRange(b, off, len);
+        ensureNotFinished();
+
+        int bufferRemainingLength = maxBlockSize - bufferOffset;
+        // while b will fill the buffer
+        while (len > bufferRemainingLength) {
+            // fill remaining space in buffer
+            System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+            bufferOffset = maxBlockSize;
+            writeBlock();
+            // compute new offset and length
+            off += bufferRemainingLength;
+            len -= bufferRemainingLength;
+            bufferRemainingLength = maxBlockSize;
+        }
+
+        System.arraycopy(b, off, buffer, bufferOffset, len);
+        bufferOffset += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (!finished) {
+            writeBlock();
+        }
+        if (out != null) {
+            out.flush();
+        }
+    }
+
+    /**
+     * A simple state check to ensure the stream is still open.
+     */
+    private void ensureNotFinished() {
+        if (finished) {
+            throw new IllegalStateException(CLOSED_STREAM);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!finished) {
+            writeEndMark();
+            flush();
+            finished = true;
+        }
+        if (out != null) {
+            out.close();
+            out = null;
+        }
+    }
+
+    public static class FLG {
+
+        private static final int VERSION = 1;
+
+        private final int presetDictionary;
+        private final int reserved1;
+        private final int contentChecksum;
+        private final int contentSize;
+        private final int blockChecksum;
+        private final int blockIndependence;
+        private final int version;
+
+        public FLG() {
+            this(false);
+        }
+
+        public FLG(boolean blockChecksum) {
+            this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+        }
+
+        private FLG(int presetDictionary,
+                    int reserved1,
+                    int contentChecksum,
+                    int contentSize,
+                    int blockChecksum,
+                    int blockIndependence,
+                    int version) {
+            this.presetDictionary = presetDictionary;
+            this.reserved1 = reserved1;
+            this.contentChecksum = contentChecksum;
+            this.contentSize = contentSize;
+            this.blockChecksum = blockChecksum;
+            this.blockIndependence = blockIndependence;
+            this.version = version;
+            validate();
+        }
+
+        public static FLG fromByte(byte flg) {
+            int presetDictionary = (flg >>> 0) & 1;
+            int reserved1 = (flg >>> 1) & 1;
+            int contentChecksum = (flg >>> 2) & 1;
+            int contentSize = (flg >>> 3) & 1;
+            int blockChecksum = (flg >>> 4) & 1;
+            int blockIndependence = (flg >>> 5) & 1;
+            int version = (flg >>> 6) & 3;
+
+            return new FLG(presetDictionary,
+                           reserved1,
+                           contentChecksum,
+                           contentSize,
+                           blockChecksum,
+                           blockIndependence,
+                           version);
+        }
+
+        public byte toByte() {
+            return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+                    | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
+        }
+
+        private void validate() {
+            if (presetDictionary != 0) {
+                throw new RuntimeException("Preset dictionary is unsupported");
+            }
+            if (reserved1 != 0) {
+                throw new RuntimeException("Reserved1 field must be 0");
+            }
+            if (contentChecksum != 0) {
+                throw new RuntimeException("Content checksum is unsupported");
+            }
+            if (contentSize != 0) {
+                throw new RuntimeException("Content size is unsupported");
+            }
+            if (blockIndependence != 1) {
+                throw new RuntimeException("Dependent block stream is unsupported");
+            }
+            if (version != VERSION) {
+                throw new RuntimeException(String.format("Version %d is unsupported", version));
+            }
+        }
+
+        public boolean isPresetDictionarySet() {
+            return presetDictionary == 1;
+        }
+
+        public boolean isContentChecksumSet() {
+            return contentChecksum == 1;
+        }
+
+        public boolean isContentSizeSet() {
+            return contentSize == 1;
+        }
+
+        public boolean isBlockChecksumSet() {
+            return blockChecksum == 1;
+        }
+
+        public boolean isBlockIndependenceSet() {
+            return blockIndependence == 1;
+        }
+
+        public int getVersion() {
+            return version;
+        }
+    }
+
+    public static class BD {
+
+        private final int reserved2;
+        private final int blockSizeValue;
+        private final int reserved3;
+
+        public BD() {
+            this(0, BLOCKSIZE_64KB, 0);
+        }
+
+        public BD(int blockSizeValue) {
+            this(0, blockSizeValue, 0);
+        }
+
+        private BD(int reserved2, int blockSizeValue, int reserved3) {
+            this.reserved2 = reserved2;
+            this.blockSizeValue = blockSizeValue;
+            this.reserved3 = reserved3;
+            validate();
+        }
+
+        public static BD fromByte(byte bd) {
+            int reserved2 = (bd >>> 0) & 15;
+            int blockMaximumSize = (bd >>> 4) & 7;
+            int reserved3 = (bd >>> 7) & 1;
+
+            return new BD(reserved2, blockMaximumSize, reserved3);
+        }
+
+        private void validate() {
+            if (reserved2 != 0) {
+                throw new RuntimeException("Reserved2 field must be 0");
+            }
+            if (blockSizeValue < 4 || blockSizeValue > 7) {
+                throw new RuntimeException("Block size value must be between 4 and 7");
+            }
+            if (reserved3 != 0) {
+                throw new RuntimeException("Reserved3 field must be 0");
+            }
+        }
+
+        // 2^(2n+8)
+        public int getBlockMaximumSize() {
+            return 1 << ((2 * blockSizeValue) + 8);
+        }
+
+        public byte toByte() {
+            return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/LogEntry.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/LogEntry.java
new file mode 100644
index 0000000..b5d7b2d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/LogEntry.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * An offset and record pair
+ */
+public final class LogEntry {
+
+    private final long offset;
+    private final Record record;
+
+    public LogEntry(long offset, Record record) {
+        this.offset = offset;
+        this.record = record;
+    }
+
+    public long offset() {
+        return this.offset;
+    }
+
+    public Record record() {
+        return this.record;
+    }
+
+    @Override
+    public String toString() {
+        return "LogEntry(" + offset + ", " + record + ")";
+    }
+    
+    public int size() {
+        return record.size() + Records.LOG_OVERHEAD;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/MemoryRecords.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/MemoryRecords.java
new file mode 100644
index 0000000..15e98ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/MemoryRecords.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.utils.AbstractIterator;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A {@link Records} implementation backed by a ByteBuffer.
+ */
+public class MemoryRecords implements Records {
+
+    private final Compressor compressor;
+    private final int capacity;
+    private final int sizeLimit;
+    private ByteBuffer buffer;
+    private boolean writable;
+
+    // Construct a writable memory records
+    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
+        this.writable = writable;
+        this.capacity = buffer.capacity();
+        this.sizeLimit = sizeLimit;
+        if (this.writable) {
+            this.buffer = null;
+            this.compressor = new Compressor(buffer, type);
+        } else {
+            this.buffer = buffer;
+            this.compressor = null;
+        }
+    }
+
+    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
+        return new MemoryRecords(buffer, type, true, capacity);
+    }
+
+    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
+        return emptyRecords(buffer, type, buffer.capacity());
+    }
+
+    public static MemoryRecords readableRecords(ByteBuffer buffer) {
+        return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
+    }
+
+    /**
+     * Append the given record and offset to the buffer
+     */
+    public void append(long offset, Record record) {
+        if (!writable)
+            throw new IllegalStateException("Memory records is not writable");
+
+        int size = record.size();
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        compressor.put(record.buffer());
+        compressor.recordWritten(size + LOG_OVERHEAD);
+        record.buffer().rewind();
+    }
+
+    /**
+     * Append a new record and offset to the buffer
+     */
+    public void append(long offset, byte[] key, byte[] value) {
+        if (!writable)
+            throw new IllegalStateException("Memory records is not writable");
+
+        int size = Record.recordSize(key, value);
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        compressor.putRecord(key, value);
+        compressor.recordWritten(size + LOG_OVERHEAD);
+    }
+
+    /**
+     * Check if we have room for a new record containing the given key/value pair
+     * 
+     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
+     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
+     * re-allocation in the underlying byte buffer stream.
+     * 
+     * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
+     * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
+     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
+     * has exceed this limit we also mark this record as full.
+     */
+    public boolean hasRoomFor(byte[] key, byte[] value) {
+        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + LOG_OVERHEAD +
+                                                 Record.recordSize(key, value) &&
+               this.sizeLimit >= this.compressor.estimatedBytesWritten();
+    }
+
+    public boolean isFull() {
+        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
+               this.sizeLimit <= this.compressor.estimatedBytesWritten();
+    }
+
+    /**
+     * Close this batch for no more appends
+     */
+    public void close() {
+        if (writable) {
+            compressor.close();
+            writable = false;
+            buffer = compressor.buffer();
+        }
+    }
+
+    /** Write the records in this set to the given channel */
+    public int writeTo(GatheringByteChannel channel) throws IOException {
+        return channel.write(buffer);
+    }
+
+    /**
+     * The size of this record set
+     */
+    public int sizeInBytes() {
+        return compressor.buffer().position();
+    }
+
+    /**
+     * The compression rate of this record set
+     */
+    public double compressionRate() {
+        if (compressor == null)
+            return 1.0;
+        else
+            return compressor.compressionRate();
+    }
+
+    /**
+     * Return the capacity of the buffer
+     */
+    public int capacity() {
+        return this.capacity;
+    }
+
+    /**
+     * Get the byte buffer that backs this records instance
+     */
+    public ByteBuffer buffer() {
+        return buffer.duplicate();
+    }
+
+    /**
+     * Return a flipped duplicate of the closed buffer to reading records
+     */
+    public ByteBuffer flip() {
+        if (writable)
+            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+
+        return (ByteBuffer) buffer.flip();
+    }
+
+    @Override
+    public Iterator<LogEntry> iterator() {
+        ByteBuffer copy = this.buffer.duplicate();
+        return new RecordsIterator(copy, CompressionType.NONE, false);
+    }
+    
+    @Override
+    public String toString() {
+        Iterator<LogEntry> iter = iterator();
+        StringBuilder builder = new StringBuilder();
+        builder.append('[');
+        while (iter.hasNext()) {
+            LogEntry entry = iter.next();
+            builder.append('(');
+            builder.append("offset=");
+            builder.append(entry.offset());
+            builder.append(",");
+            builder.append("record=");
+            builder.append(entry.record());
+            builder.append(")");
+        }
+        builder.append(']');
+        return builder.toString();
+    }
+
+    public static class RecordsIterator extends AbstractIterator<LogEntry> {
+        private final ByteBuffer buffer;
+        private final DataInputStream stream;
+        private final CompressionType type;
+        private final boolean shallow;
+        private RecordsIterator innerIter;
+
+        public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
+            this.type = type;
+            this.buffer = buffer;
+            this.shallow = shallow;
+            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+        }
+
+        /*
+         * Read the next record from the buffer.
+         * 
+         * Note that in the compressed message set, each message value size is set as the size of the un-compressed
+         * version of the message value, so when we do de-compression allocating an array of the specified size for
+         * reading compressed value data is sufficient.
+         */
+        @Override
+        protected LogEntry makeNext() {
+            if (innerDone()) {
+                try {
+                    // read the offset
+                    long offset = stream.readLong();
+                    // read record size
+                    int size = stream.readInt();
+                    if (size < 0)
+                        throw new IllegalStateException("Record with size " + size);
+                    // read the record, if compression is used we cannot depend on size
+                    // and hence has to do extra copy
+                    ByteBuffer rec;
+                    if (type == CompressionType.NONE) {
+                        rec = buffer.slice();
+                        int newPos = buffer.position() + size;
+                        if (newPos > buffer.limit())
+                            return allDone();
+                        buffer.position(newPos);
+                        rec.limit(size);
+                    } else {
+                        byte[] recordBuffer = new byte[size];
+                        stream.readFully(recordBuffer, 0, size);
+                        rec = ByteBuffer.wrap(recordBuffer);
+                    }
+                    LogEntry entry = new LogEntry(offset, new Record(rec));
+
+                    // decide whether to go shallow or deep iteration if it is compressed
+                    CompressionType compression = entry.record().compressionType();
+                    if (compression == CompressionType.NONE || shallow) {
+                        return entry;
+                    } else {
+                        // init the inner iterator with the value payload of the message,
+                        // which will de-compress the payload to a set of messages;
+                        // since we assume nested compression is not allowed, the deep iterator
+                        // would not try to further decompress underlying messages
+                        ByteBuffer value = entry.record().value();
+                        innerIter = new RecordsIterator(value, compression, true);
+                        return innerIter.next();
+                    }
+                } catch (EOFException e) {
+                    return allDone();
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            } else {
+                return innerIter.next();
+            }
+        }
+
+        private boolean innerDone() {
+            return innerIter == null || !innerIter.hasNext();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Record.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Record.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Record.java
new file mode 100644
index 0000000..f71900c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Record.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import org.apache.flink.kafka_backport.common.utils.Crc32;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A record: a serialized key and value along with the associated CRC and other fields
+ */
+public final class Record {
+
+    /**
+     * The current offset and size for all the fixed-length fields
+     */
+    public static final int CRC_OFFSET = 0;
+    public static final int CRC_LENGTH = 4;
+    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
+    public static final int MAGIC_LENGTH = 1;
+    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    public static final int ATTRIBUTE_LENGTH = 1;
+    public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int KEY_SIZE_LENGTH = 4;
+    public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
+    public static final int VALUE_SIZE_LENGTH = 4;
+
+    /**
+     * The size for the record header
+     */
+    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
+
+    /**
+     * The amount of overhead bytes in a record
+     */
+    public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * The current "magic" value
+     */
+    public static final byte CURRENT_MAGIC_VALUE = 0;
+
+    /**
+     * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
+     * compression
+     */
+    public static final int COMPRESSION_CODEC_MASK = 0x07;
+
+    /**
+     * Compression code for uncompressed records
+     */
+    public static final int NO_COMPRESSION = 0;
+
+    private final ByteBuffer buffer;
+
+    public Record(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    /**
+     * A constructor to create a LogRecord. If the record's compression type is not none, then
+     * its value payload should be already compressed with the specified type; the constructor
+     * would always write the value payload as is and will not do the compression itself.
+     * 
+     * @param key The key of the record (null, if none)
+     * @param value The record value
+     * @param type The compression type used on the contents of the record (if any)
+     * @param valueOffset The offset into the payload array used to extract payload
+     * @param valueSize The size of the payload to use
+     */
+    public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
+                value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
+        write(this.buffer, key, value, type, valueOffset, valueSize);
+        this.buffer.rewind();
+    }
+
+    public Record(byte[] key, byte[] value, CompressionType type) {
+        this(key, value, type, 0, -1);
+    }
+
+    public Record(byte[] value, CompressionType type) {
+        this(null, value, type);
+    }
+
+    public Record(byte[] key, byte[] value) {
+        this(key, value, CompressionType.NONE);
+    }
+
+    public Record(byte[] value) {
+        this(null, value, CompressionType.NONE);
+    }
+
+    // Write a record to the buffer, if the record's compression type is none, then
+    // its value payload should be already compressed with the specified type
+    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        // construct the compressor with compression type none since this function will not do any
+        //compression according to the input type, it will just write the record's payload as is
+        Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
+        compressor.putRecord(key, value, type, valueOffset, valueSize);
+    }
+
+    public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
+        // write crc
+        compressor.putInt((int) (crc & 0xffffffffL));
+        // write magic value
+        compressor.putByte(CURRENT_MAGIC_VALUE);
+        // write attributes
+        compressor.putByte(attributes);
+        // write the key
+        if (key == null) {
+            compressor.putInt(-1);
+        } else {
+            compressor.putInt(key.length);
+            compressor.put(key, 0, key.length);
+        }
+        // write the value
+        if (value == null) {
+            compressor.putInt(-1);
+        } else {
+            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+            compressor.putInt(size);
+            compressor.put(value, valueOffset, size);
+        }
+    }
+
+    public static int recordSize(byte[] key, byte[] value) {
+        return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
+    }
+
+    public static int recordSize(int keySize, int valueSize) {
+        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
+    }
+
+    public ByteBuffer buffer() {
+        return this.buffer;
+    }
+
+    public static byte computeAttributes(CompressionType type) {
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+        return attributes;
+    }
+
+    /**
+     * Compute the checksum of the record from the record contents
+     */
+    public static long computeChecksum(ByteBuffer buffer, int position, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(buffer.array(), buffer.arrayOffset() + position, size);
+        return crc.getValue();
+    }
+
+    /**
+     * Compute the checksum of the record from the attributes, key and value payloads
+     */
+    public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        Crc32 crc = new Crc32();
+        crc.update(CURRENT_MAGIC_VALUE);
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+        crc.update(attributes);
+        // update for the key
+        if (key == null) {
+            crc.updateInt(-1);
+        } else {
+            crc.updateInt(key.length);
+            crc.update(key, 0, key.length);
+        }
+        // update for the value
+        if (value == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+            crc.updateInt(size);
+            crc.update(value, valueOffset, size);
+        }
+        return crc.getValue();
+    }
+
+
+    /**
+     * Compute the checksum of the record from the record contents
+     */
+    public long computeChecksum() {
+        return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+    }
+
+    /**
+     * Retrieve the previously computed CRC for this record
+     */
+    public long checksum() {
+        return Utils.readUnsignedInt(buffer, CRC_OFFSET);
+    }
+
+    /**
+     * Returns true if the crc stored with the record matches the crc computed off the record contents
+     */
+    public boolean isValid() {
+        return checksum() == computeChecksum();
+    }
+
+    /**
+     * Throw an InvalidRecordException if isValid is false for this record
+     */
+    public void ensureValid() {
+        if (!isValid())
+            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+                                             + ", computed crc = "
+                                             + computeChecksum()
+                                             + ")");
+    }
+
+    /**
+     * The complete serialized size of this record in bytes (including crc, header attributes, etc)
+     */
+    public int size() {
+        return buffer.limit();
+    }
+
+    /**
+     * The length of the key in bytes
+     */
+    public int keySize() {
+        return buffer.getInt(KEY_SIZE_OFFSET);
+    }
+
+    /**
+     * Does the record have a key?
+     */
+    public boolean hasKey() {
+        return keySize() >= 0;
+    }
+
+    /**
+     * The position where the value size is stored
+     */
+    private int valueSizeOffset() {
+        return KEY_OFFSET + Math.max(0, keySize());
+    }
+
+    /**
+     * The length of the value in bytes
+     */
+    public int valueSize() {
+        return buffer.getInt(valueSizeOffset());
+    }
+
+    /**
+     * The magic version of this record
+     */
+    public byte magic() {
+        return buffer.get(MAGIC_OFFSET);
+    }
+
+    /**
+     * The attributes stored with this record
+     */
+    public byte attributes() {
+        return buffer.get(ATTRIBUTES_OFFSET);
+    }
+
+    /**
+     * The compression type used with this record
+     */
+    public CompressionType compressionType() {
+        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
+    }
+
+    /**
+     * A ByteBuffer containing the value of this record
+     */
+    public ByteBuffer value() {
+        return sliceDelimited(valueSizeOffset());
+    }
+
+    /**
+     * A ByteBuffer containing the message key
+     */
+    public ByteBuffer key() {
+        return sliceDelimited(KEY_SIZE_OFFSET);
+    }
+
+    /**
+     * Read a size-delimited byte buffer starting at the given offset
+     */
+    private ByteBuffer sliceDelimited(int start) {
+        int size = buffer.getInt(start);
+        if (size < 0) {
+            return null;
+        } else {
+            ByteBuffer b = buffer.duplicate();
+            b.position(start + 4);
+            b = b.slice();
+            b.limit(size);
+            b.rewind();
+            return b;
+        }
+    }
+
+    public String toString() {
+        return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
+                magic(),
+                attributes(),
+                compressionType(),
+                checksum(),
+                key() == null ? 0 : key().limit(),
+                value() == null ? 0 : value().limit());
+    }
+
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+        if (other == null)
+            return false;
+        if (!other.getClass().equals(Record.class))
+            return false;
+        Record record = (Record) other;
+        return this.buffer.equals(record.buffer);
+    }
+
+    public int hashCode() {
+        return buffer.hashCode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Records.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Records.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Records.java
new file mode 100644
index 0000000..433748b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/Records.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
+ * for the in-memory representation.
+ */
+public interface Records extends Iterable<LogEntry> {
+
+    int SIZE_LENGTH = 4;
+    int OFFSET_LENGTH = 8;
+    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
+
+    /**
+     * Write these records to the given channel
+     * @param channel The channel to write to
+     * @return The number of bytes written
+     * @throws IOException If the write fails.
+     */
+    public int writeTo(GatheringByteChannel channel) throws IOException;
+
+    /**
+     * The size of these records in bytes
+     */
+    public int sizeInBytes();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequest.java
new file mode 100644
index 0000000..e888d1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public abstract class AbstractRequest extends AbstractRequestResponse {
+
+    public AbstractRequest(Struct struct) {
+        super(struct);
+    }
+
+    /**
+     * Get an error response for a request for a given api version
+     */
+    public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
+
+    /**
+     * Factory method for getting a request object based on ApiKey ID and a buffer
+     */
+    public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
+        switch (ApiKeys.forId(requestId)) {
+            case PRODUCE:
+                return ProduceRequest.parse(buffer, versionId);
+            case FETCH:
+                return FetchRequest.parse(buffer, versionId);
+            case LIST_OFFSETS:
+                return ListOffsetRequest.parse(buffer, versionId);
+            case METADATA:
+                return MetadataRequest.parse(buffer, versionId);
+            case OFFSET_COMMIT:
+                return OffsetCommitRequest.parse(buffer, versionId);
+            case OFFSET_FETCH:
+                return OffsetFetchRequest.parse(buffer, versionId);
+            case CONSUMER_METADATA:
+                return ConsumerMetadataRequest.parse(buffer, versionId);
+            case JOIN_GROUP:
+                return JoinGroupRequest.parse(buffer, versionId);
+            case HEARTBEAT:
+                return HeartbeatRequest.parse(buffer, versionId);
+            default:
+                return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequestResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequestResponse.java
new file mode 100644
index 0000000..002e8db
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/AbstractRequestResponse.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public abstract class AbstractRequestResponse {
+    protected final Struct struct;
+
+
+    public AbstractRequestResponse(Struct struct) {
+        this.struct = struct;
+    }
+
+    public Struct toStruct() {
+        return struct;
+    }
+
+    /**
+     * Get the serialized size of this object
+     */
+    public int sizeOf() {
+        return struct.sizeOf();
+    }
+
+    /**
+     * Write this object to a buffer
+     */
+    public void writeTo(ByteBuffer buffer) {
+        struct.writeTo(buffer);
+    }
+
+    @Override
+    public String toString() {
+        return struct.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return struct.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        AbstractRequestResponse other = (AbstractRequestResponse) obj;
+        return struct.equals(other.struct);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataRequest.java
new file mode 100644
index 0000000..e4f5e90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ConsumerMetadataRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+
+    private final String groupId;
+
+    public ConsumerMetadataRequest(String groupId) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        this.groupId = groupId;
+    }
+
+    public ConsumerMetadataRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
+    }
+
+    public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
+        return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataResponse.java
new file mode 100644
index 0000000..3688eda
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ConsumerMetadataResponse.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ConsumerMetadataResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private final Node node;
+
+    public ConsumerMetadataResponse(short errorCode, Node node) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        this.errorCode = errorCode;
+        this.node = node;
+    }
+
+    public ConsumerMetadataResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
+        return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file