You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:20 UTC
[03/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
deleted file mode 100644
index 94f55a6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/MemoryRecords.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import org.apache.kafka.copied.common.KafkaException;
-import org.apache.kafka.copied.common.utils.AbstractIterator;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-
-/**
- * A {@link Records} implementation backed by a ByteBuffer.
- */
-public class MemoryRecords implements Records {
-
- private final Compressor compressor;
- private final int capacity;
- private final int sizeLimit;
- private ByteBuffer buffer;
- private boolean writable;
-
- // Construct a writable memory records
- private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
- this.writable = writable;
- this.capacity = buffer.capacity();
- this.sizeLimit = sizeLimit;
- if (this.writable) {
- this.buffer = null;
- this.compressor = new Compressor(buffer, type);
- } else {
- this.buffer = buffer;
- this.compressor = null;
- }
- }
-
- public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
- return new MemoryRecords(buffer, type, true, capacity);
- }
-
- public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
- return emptyRecords(buffer, type, buffer.capacity());
- }
-
- public static MemoryRecords readableRecords(ByteBuffer buffer) {
- return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
- }
-
- /**
- * Append the given record and offset to the buffer
- */
- public void append(long offset, Record record) {
- if (!writable)
- throw new IllegalStateException("Memory records is not writable");
-
- int size = record.size();
- compressor.putLong(offset);
- compressor.putInt(size);
- compressor.put(record.buffer());
- compressor.recordWritten(size + LOG_OVERHEAD);
- record.buffer().rewind();
- }
-
- /**
- * Append a new record and offset to the buffer
- */
- public void append(long offset, byte[] key, byte[] value) {
- if (!writable)
- throw new IllegalStateException("Memory records is not writable");
-
- int size = Record.recordSize(key, value);
- compressor.putLong(offset);
- compressor.putInt(size);
- compressor.putRecord(key, value);
- compressor.recordWritten(size + LOG_OVERHEAD);
- }
-
- /**
- * Check if we have room for a new record containing the given key/value pair
- *
- * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
- * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
- * re-allocation in the underlying byte buffer stream.
- *
- * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
- * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
- * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
- * has exceed this limit we also mark this record as full.
- */
- public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + LOG_OVERHEAD +
- Record.recordSize(key, value) &&
- this.sizeLimit >= this.compressor.estimatedBytesWritten();
- }
-
- public boolean isFull() {
- return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
- this.sizeLimit <= this.compressor.estimatedBytesWritten();
- }
-
- /**
- * Close this batch for no more appends
- */
- public void close() {
- if (writable) {
- compressor.close();
- writable = false;
- buffer = compressor.buffer();
- }
- }
-
- /** Write the records in this set to the given channel */
- public int writeTo(GatheringByteChannel channel) throws IOException {
- return channel.write(buffer);
- }
-
- /**
- * The size of this record set
- */
- public int sizeInBytes() {
- return compressor.buffer().position();
- }
-
- /**
- * The compression rate of this record set
- */
- public double compressionRate() {
- if (compressor == null)
- return 1.0;
- else
- return compressor.compressionRate();
- }
-
- /**
- * Return the capacity of the buffer
- */
- public int capacity() {
- return this.capacity;
- }
-
- /**
- * Get the byte buffer that backs this records instance
- */
- public ByteBuffer buffer() {
- return buffer.duplicate();
- }
-
- /**
- * Return a flipped duplicate of the closed buffer to reading records
- */
- public ByteBuffer flip() {
- if (writable)
- throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
-
- return (ByteBuffer) buffer.flip();
- }
-
- @Override
- public Iterator<LogEntry> iterator() {
- ByteBuffer copy = this.buffer.duplicate();
- return new RecordsIterator(copy, CompressionType.NONE, false);
- }
-
- @Override
- public String toString() {
- Iterator<LogEntry> iter = iterator();
- StringBuilder builder = new StringBuilder();
- builder.append('[');
- while (iter.hasNext()) {
- LogEntry entry = iter.next();
- builder.append('(');
- builder.append("offset=");
- builder.append(entry.offset());
- builder.append(",");
- builder.append("record=");
- builder.append(entry.record());
- builder.append(")");
- }
- builder.append(']');
- return builder.toString();
- }
-
- public static class RecordsIterator extends AbstractIterator<LogEntry> {
- private final ByteBuffer buffer;
- private final DataInputStream stream;
- private final CompressionType type;
- private final boolean shallow;
- private RecordsIterator innerIter;
-
- public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
- this.type = type;
- this.buffer = buffer;
- this.shallow = shallow;
- this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
- }
-
- /*
- * Read the next record from the buffer.
- *
- * Note that in the compressed message set, each message value size is set as the size of the un-compressed
- * version of the message value, so when we do de-compression allocating an array of the specified size for
- * reading compressed value data is sufficient.
- */
- @Override
- protected LogEntry makeNext() {
- if (innerDone()) {
- try {
- // read the offset
- long offset = stream.readLong();
- // read record size
- int size = stream.readInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
- // read the record, if compression is used we cannot depend on size
- // and hence has to do extra copy
- ByteBuffer rec;
- if (type == CompressionType.NONE) {
- rec = buffer.slice();
- int newPos = buffer.position() + size;
- if (newPos > buffer.limit())
- return allDone();
- buffer.position(newPos);
- rec.limit(size);
- } else {
- byte[] recordBuffer = new byte[size];
- stream.readFully(recordBuffer, 0, size);
- rec = ByteBuffer.wrap(recordBuffer);
- }
- LogEntry entry = new LogEntry(offset, new Record(rec));
-
- // decide whether to go shallow or deep iteration if it is compressed
- CompressionType compression = entry.record().compressionType();
- if (compression == CompressionType.NONE || shallow) {
- return entry;
- } else {
- // init the inner iterator with the value payload of the message,
- // which will de-compress the payload to a set of messages;
- // since we assume nested compression is not allowed, the deep iterator
- // would not try to further decompress underlying messages
- ByteBuffer value = entry.record().value();
- innerIter = new RecordsIterator(value, compression, true);
- return innerIter.next();
- }
- } catch (EOFException e) {
- return allDone();
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- } else {
- return innerIter.next();
- }
- }
-
- private boolean innerDone() {
- return innerIter == null || !innerIter.hasNext();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
deleted file mode 100644
index 54b4084..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Record.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import org.apache.kafka.copied.common.utils.Crc32;
-import org.apache.kafka.copied.common.utils.Utils;
-
-import java.nio.ByteBuffer;
-
-
-/**
- * A record: a serialized key and value along with the associated CRC and other fields
- */
-public final class Record {
-
- /**
- * The current offset and size for all the fixed-length fields
- */
- public static final int CRC_OFFSET = 0;
- public static final int CRC_LENGTH = 4;
- public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
- public static final int MAGIC_LENGTH = 1;
- public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
- public static final int ATTRIBUTE_LENGTH = 1;
- public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
- public static final int KEY_SIZE_LENGTH = 4;
- public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
- public static final int VALUE_SIZE_LENGTH = 4;
-
- /**
- * The size for the record header
- */
- public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
-
- /**
- * The amount of overhead bytes in a record
- */
- public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
-
- /**
- * The current "magic" value
- */
- public static final byte CURRENT_MAGIC_VALUE = 0;
-
- /**
- * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
- * compression
- */
- public static final int COMPRESSION_CODEC_MASK = 0x07;
-
- /**
- * Compression code for uncompressed records
- */
- public static final int NO_COMPRESSION = 0;
-
- private final ByteBuffer buffer;
-
- public Record(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- /**
- * A constructor to create a LogRecord. If the record's compression type is not none, then
- * its value payload should be already compressed with the specified type; the constructor
- * would always write the value payload as is and will not do the compression itself.
- *
- * @param key The key of the record (null, if none)
- * @param value The record value
- * @param type The compression type used on the contents of the record (if any)
- * @param valueOffset The offset into the payload array used to extract payload
- * @param valueSize The size of the payload to use
- */
- public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
- value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
- write(this.buffer, key, value, type, valueOffset, valueSize);
- this.buffer.rewind();
- }
-
- public Record(byte[] key, byte[] value, CompressionType type) {
- this(key, value, type, 0, -1);
- }
-
- public Record(byte[] value, CompressionType type) {
- this(null, value, type);
- }
-
- public Record(byte[] key, byte[] value) {
- this(key, value, CompressionType.NONE);
- }
-
- public Record(byte[] value) {
- this(null, value, CompressionType.NONE);
- }
-
- // Write a record to the buffer, if the record's compression type is none, then
- // its value payload should be already compressed with the specified type
- public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- // construct the compressor with compression type none since this function will not do any
- //compression according to the input type, it will just write the record's payload as is
- Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
- compressor.putRecord(key, value, type, valueOffset, valueSize);
- }
-
- public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
- // write crc
- compressor.putInt((int) (crc & 0xffffffffL));
- // write magic value
- compressor.putByte(CURRENT_MAGIC_VALUE);
- // write attributes
- compressor.putByte(attributes);
- // write the key
- if (key == null) {
- compressor.putInt(-1);
- } else {
- compressor.putInt(key.length);
- compressor.put(key, 0, key.length);
- }
- // write the value
- if (value == null) {
- compressor.putInt(-1);
- } else {
- int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- compressor.putInt(size);
- compressor.put(value, valueOffset, size);
- }
- }
-
- public static int recordSize(byte[] key, byte[] value) {
- return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
- }
-
- public static int recordSize(int keySize, int valueSize) {
- return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
- }
-
- public ByteBuffer buffer() {
- return this.buffer;
- }
-
- public static byte computeAttributes(CompressionType type) {
- byte attributes = 0;
- if (type.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
- return attributes;
- }
-
- /**
- * Compute the checksum of the record from the record contents
- */
- public static long computeChecksum(ByteBuffer buffer, int position, int size) {
- Crc32 crc = new Crc32();
- crc.update(buffer.array(), buffer.arrayOffset() + position, size);
- return crc.getValue();
- }
-
- /**
- * Compute the checksum of the record from the attributes, key and value payloads
- */
- public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- Crc32 crc = new Crc32();
- crc.update(CURRENT_MAGIC_VALUE);
- byte attributes = 0;
- if (type.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
- crc.update(attributes);
- // update for the key
- if (key == null) {
- crc.updateInt(-1);
- } else {
- crc.updateInt(key.length);
- crc.update(key, 0, key.length);
- }
- // update for the value
- if (value == null) {
- crc.updateInt(-1);
- } else {
- int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- crc.updateInt(size);
- crc.update(value, valueOffset, size);
- }
- return crc.getValue();
- }
-
-
- /**
- * Compute the checksum of the record from the record contents
- */
- public long computeChecksum() {
- return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
- }
-
- /**
- * Retrieve the previously computed CRC for this record
- */
- public long checksum() {
- return Utils.readUnsignedInt(buffer, CRC_OFFSET);
- }
-
- /**
- * Returns true if the crc stored with the record matches the crc computed off the record contents
- */
- public boolean isValid() {
- return checksum() == computeChecksum();
- }
-
- /**
- * Throw an InvalidRecordException if isValid is false for this record
- */
- public void ensureValid() {
- if (!isValid())
- throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
- + ", computed crc = "
- + computeChecksum()
- + ")");
- }
-
- /**
- * The complete serialized size of this record in bytes (including crc, header attributes, etc)
- */
- public int size() {
- return buffer.limit();
- }
-
- /**
- * The length of the key in bytes
- */
- public int keySize() {
- return buffer.getInt(KEY_SIZE_OFFSET);
- }
-
- /**
- * Does the record have a key?
- */
- public boolean hasKey() {
- return keySize() >= 0;
- }
-
- /**
- * The position where the value size is stored
- */
- private int valueSizeOffset() {
- return KEY_OFFSET + Math.max(0, keySize());
- }
-
- /**
- * The length of the value in bytes
- */
- public int valueSize() {
- return buffer.getInt(valueSizeOffset());
- }
-
- /**
- * The magic version of this record
- */
- public byte magic() {
- return buffer.get(MAGIC_OFFSET);
- }
-
- /**
- * The attributes stored with this record
- */
- public byte attributes() {
- return buffer.get(ATTRIBUTES_OFFSET);
- }
-
- /**
- * The compression type used with this record
- */
- public CompressionType compressionType() {
- return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
- }
-
- /**
- * A ByteBuffer containing the value of this record
- */
- public ByteBuffer value() {
- return sliceDelimited(valueSizeOffset());
- }
-
- /**
- * A ByteBuffer containing the message key
- */
- public ByteBuffer key() {
- return sliceDelimited(KEY_SIZE_OFFSET);
- }
-
- /**
- * Read a size-delimited byte buffer starting at the given offset
- */
- private ByteBuffer sliceDelimited(int start) {
- int size = buffer.getInt(start);
- if (size < 0) {
- return null;
- } else {
- ByteBuffer b = buffer.duplicate();
- b.position(start + 4);
- b = b.slice();
- b.limit(size);
- b.rewind();
- return b;
- }
- }
-
- public String toString() {
- return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
- magic(),
- attributes(),
- compressionType(),
- checksum(),
- key() == null ? 0 : key().limit(),
- value() == null ? 0 : value().limit());
- }
-
- public boolean equals(Object other) {
- if (this == other)
- return true;
- if (other == null)
- return false;
- if (!other.getClass().equals(Record.class))
- return false;
- Record record = (Record) other;
- return this.buffer.equals(record.buffer);
- }
-
- public int hashCode() {
- return buffer.hashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
deleted file mode 100644
index 06f6668..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/record/Records.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.record;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
- */
-public interface Records extends Iterable<LogEntry> {
-
- int SIZE_LENGTH = 4;
- int OFFSET_LENGTH = 8;
- int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
-
- /**
- * Write these records to the given channel
- * @param channel The channel to write to
- * @return The number of bytes written
- * @throws IOException If the write fails.
- */
- public int writeTo(GatheringByteChannel channel) throws IOException;
-
- /**
- * The size of these records in bytes
- */
- public int sizeInBytes();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
deleted file mode 100644
index 53972c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public abstract class AbstractRequest extends AbstractRequestResponse {
-
- public AbstractRequest(Struct struct) {
- super(struct);
- }
-
- /**
- * Get an error response for a request for a given api version
- */
- public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
-
- /**
- * Factory method for getting a request object based on ApiKey ID and a buffer
- */
- public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
- switch (ApiKeys.forId(requestId)) {
- case PRODUCE:
- return ProduceRequest.parse(buffer, versionId);
- case FETCH:
- return FetchRequest.parse(buffer, versionId);
- case LIST_OFFSETS:
- return ListOffsetRequest.parse(buffer, versionId);
- case METADATA:
- return MetadataRequest.parse(buffer, versionId);
- case OFFSET_COMMIT:
- return OffsetCommitRequest.parse(buffer, versionId);
- case OFFSET_FETCH:
- return OffsetFetchRequest.parse(buffer, versionId);
- case CONSUMER_METADATA:
- return ConsumerMetadataRequest.parse(buffer, versionId);
- case JOIN_GROUP:
- return JoinGroupRequest.parse(buffer, versionId);
- case HEARTBEAT:
- return HeartbeatRequest.parse(buffer, versionId);
- default:
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
deleted file mode 100644
index b070fec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/AbstractRequestResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public abstract class AbstractRequestResponse {
- protected final Struct struct;
-
-
- public AbstractRequestResponse(Struct struct) {
- this.struct = struct;
- }
-
- public Struct toStruct() {
- return struct;
- }
-
- /**
- * Get the serialized size of this object
- */
- public int sizeOf() {
- return struct.sizeOf();
- }
-
- /**
- * Write this object to a buffer
- */
- public void writeTo(ByteBuffer buffer) {
- struct.writeTo(buffer);
- }
-
- @Override
- public String toString() {
- return struct.toString();
- }
-
- @Override
- public int hashCode() {
- return struct.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- AbstractRequestResponse other = (AbstractRequestResponse) obj;
- return struct.equals(other.struct);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
deleted file mode 100644
index c5d65b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
-
- private final String groupId;
-
- public ConsumerMetadataRequest(String groupId) {
- super(new Struct(CURRENT_SCHEMA));
-
- struct.set(GROUP_ID_KEY_NAME, groupId);
- this.groupId = groupId;
- }
-
- public ConsumerMetadataRequest(Struct struct) {
- super(struct);
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
- return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
- }
-
- public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
- return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
deleted file mode 100644
index 68fd4cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ConsumerMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ConsumerMetadataResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
- private static final String ERROR_CODE_KEY_NAME = "error_code";
- private static final String COORDINATOR_KEY_NAME = "coordinator";
-
- // coordinator level field names
- private static final String NODE_ID_KEY_NAME = "node_id";
- private static final String HOST_KEY_NAME = "host";
- private static final String PORT_KEY_NAME = "port";
-
- private final short errorCode;
- private final Node node;
-
- public ConsumerMetadataResponse(short errorCode, Node node) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(ERROR_CODE_KEY_NAME, errorCode);
- Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
- coordinator.set(NODE_ID_KEY_NAME, node.id());
- coordinator.set(HOST_KEY_NAME, node.host());
- coordinator.set(PORT_KEY_NAME, node.port());
- struct.set(COORDINATOR_KEY_NAME, coordinator);
- this.errorCode = errorCode;
- this.node = node;
- }
-
- public ConsumerMetadataResponse(Struct struct) {
- super(struct);
- errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
- Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
- int nodeId = broker.getInt(NODE_ID_KEY_NAME);
- String host = broker.getString(HOST_KEY_NAME);
- int port = broker.getInt(PORT_KEY_NAME);
- node = new Node(nodeId, host, port);
- }
-
- public short errorCode() {
- return errorCode;
- }
-
- public Node node() {
- return node;
- }
-
- public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
- return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
deleted file mode 100644
index 78069ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchRequest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FetchRequest extends AbstractRequest {
-
- public static final int CONSUMER_REPLICA_ID = -1;
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
- private static final String REPLICA_ID_KEY_NAME = "replica_id";
- private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
- private static final String MIN_BYTES_KEY_NAME = "min_bytes";
- private static final String TOPICS_KEY_NAME = "topics";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
- private static final String MAX_BYTES_KEY_NAME = "max_bytes";
-
- private final int replicaId;
- private final int maxWait;
- private final int minBytes;
- private final Map<TopicPartition, PartitionData> fetchData;
-
- public static final class PartitionData {
- public final long offset;
- public final int maxBytes;
-
- public PartitionData(long offset, int maxBytes) {
- this.offset = offset;
- this.maxBytes = maxBytes;
- }
- }
-
- /**
- * Create a non-replica fetch request
- */
- public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
- this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
- }
-
- /**
- * Create a replica fetch request
- */
- public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
-
- struct.set(REPLICA_ID_KEY_NAME, replicaId);
- struct.set(MAX_WAIT_KEY_NAME, maxWait);
- struct.set(MIN_BYTES_KEY_NAME, minBytes);
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
- Struct topicData = struct.instance(TOPICS_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
- partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(TOPICS_KEY_NAME, topicArray.toArray());
- this.replicaId = replicaId;
- this.maxWait = maxWait;
- this.minBytes = minBytes;
- this.fetchData = fetchData;
- }
-
- public FetchRequest(Struct struct) {
- super(struct);
- replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
- maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
- minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
- fetchData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
- int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
- PartitionData partitionData = new PartitionData(offset, maxBytes);
- fetchData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
-
- for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
- FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
- FetchResponse.INVALID_HIGHWATERMARK,
- FetchResponse.EMPTY_RECORD_SET);
- responseData.put(entry.getKey(), partitionResponse);
- }
-
- switch (versionId) {
- case 0:
- return new FetchResponse(responseData);
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
- }
- }
-
- public int replicaId() {
- return replicaId;
- }
-
- public int maxWait() {
- return maxWait;
- }
-
- public int minBytes() {
- return minBytes;
- }
-
- public Map<TopicPartition, PartitionData> fetchData() {
- return fetchData;
- }
-
- public static FetchRequest parse(ByteBuffer buffer, int versionId) {
- return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
- }
-
- public static FetchRequest parse(ByteBuffer buffer) {
- return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
deleted file mode 100644
index fc2c44e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/FetchResponse.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FetchResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
- private static final String RESPONSES_KEY_NAME = "responses";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- /**
- * Possible error code:
- *
- * OFFSET_OUT_OF_RANGE (1)
- * UNKNOWN_TOPIC_OR_PARTITION (3)
- * NOT_LEADER_FOR_PARTITION (6)
- * REPLICA_NOT_AVAILABLE (9)
- * UNKNOWN (-1)
- */
-
- private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
- private static final String RECORD_SET_KEY_NAME = "record_set";
-
- public static final long INVALID_HIGHWATERMARK = -1L;
- public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
-
- private final Map<TopicPartition, PartitionData> responseData;
-
- public static final class PartitionData {
- public final short errorCode;
- public final long highWatermark;
- public final ByteBuffer recordSet;
-
- public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
- this.errorCode = errorCode;
- this.highWatermark = highWatermark;
- this.recordSet = recordSet;
- }
- }
-
- public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
- partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
- partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
- this.responseData = responseData;
- }
-
- public FetchResponse(Struct struct) {
- super(struct);
- responseData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
- long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
- ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
- PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
-
- public Map<TopicPartition, PartitionData> responseData() {
- return responseData;
- }
-
- public static FetchResponse parse(ByteBuffer buffer) {
- return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
deleted file mode 100644
index 7cb7699..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatRequest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class HeartbeatRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
- private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-
- private final String groupId;
- private final int groupGenerationId;
- private final String consumerId;
-
- public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(GROUP_ID_KEY_NAME, groupId);
- struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
- this.groupId = groupId;
- this.groupGenerationId = groupGenerationId;
- this.consumerId = consumerId;
- }
-
- public HeartbeatRequest(Struct struct) {
- super(struct);
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- return new HeartbeatResponse(Errors.forException(e).code());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public int groupGenerationId() {
- return groupGenerationId;
- }
-
- public String consumerId() {
- return consumerId;
- }
-
- public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
- return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
- }
-
- public static HeartbeatRequest parse(ByteBuffer buffer) {
- return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
deleted file mode 100644
index 12c5ac8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/HeartbeatResponse.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class HeartbeatResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- /**
- * Possible error code:
- *
- * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
- * ILLEGAL_GENERATION (22)
- * UNKNOWN_CONSUMER_ID (25)
- */
-
- private final short errorCode;
- public HeartbeatResponse(short errorCode) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(ERROR_CODE_KEY_NAME, errorCode);
- this.errorCode = errorCode;
- }
-
- public HeartbeatResponse(Struct struct) {
- super(struct);
- errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
- }
-
- public short errorCode() {
- return errorCode;
- }
-
- public static HeartbeatResponse parse(ByteBuffer buffer) {
- return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
deleted file mode 100644
index 011bb9d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupRequest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class JoinGroupRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
- private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
- private static final String TOPICS_KEY_NAME = "topics";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
-
- public static final String UNKNOWN_CONSUMER_ID = "";
-
- private final String groupId;
- private final int sessionTimeout;
- private final List<String> topics;
- private final String consumerId;
- private final String strategy;
-
- public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
- super(new Struct(CURRENT_SCHEMA));
- struct.set(GROUP_ID_KEY_NAME, groupId);
- struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
- struct.set(TOPICS_KEY_NAME, topics.toArray());
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
- struct.set(STRATEGY_KEY_NAME, strategy);
- this.groupId = groupId;
- this.sessionTimeout = sessionTimeout;
- this.topics = topics;
- this.consumerId = consumerId;
- this.strategy = strategy;
- }
-
- public JoinGroupRequest(Struct struct) {
- super(struct);
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
- Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
- topics = new ArrayList<String>();
- for (Object topic: topicsArray)
- topics.add((String) topic);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
- strategy = struct.getString(STRATEGY_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- return new JoinGroupResponse(
- Errors.forException(e).code(),
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_CONSUMER_ID,
- Collections.<TopicPartition>emptyList());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public int sessionTimeout() {
- return sessionTimeout;
- }
-
- public List<String> topics() {
- return topics;
- }
-
- public String consumerId() {
- return consumerId;
- }
-
- public String strategy() {
- return strategy;
- }
-
- public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
- return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
- }
-
- public static JoinGroupRequest parse(ByteBuffer buffer) {
- return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
deleted file mode 100644
index 5855e94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/JoinGroupResponse.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class JoinGroupResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- /**
- * Possible error code:
- *
- * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
- * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
- * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
- * UNKNOWN_CONSUMER_ID (25)
- * INVALID_SESSION_TIMEOUT (26)
- */
-
- private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- public static final int UNKNOWN_GENERATION_ID = -1;
- public static final String UNKNOWN_CONSUMER_ID = "";
-
- private final short errorCode;
- private final int generationId;
- private final String consumerId;
- private final List<TopicPartition> assignedPartitions;
-
- public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
- super(new Struct(CURRENT_SCHEMA));
-
- Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
-
- struct.set(ERROR_CODE_KEY_NAME, errorCode);
- struct.set(GENERATION_ID_KEY_NAME, generationId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
- Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entries.getKey());
- topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
- topicArray.add(topicData);
- }
- struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
-
- this.errorCode = errorCode;
- this.generationId = generationId;
- this.consumerId = consumerId;
- this.assignedPartitions = assignedPartitions;
- }
-
- public JoinGroupResponse(Struct struct) {
- super(struct);
- assignedPartitions = new ArrayList<TopicPartition>();
- for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
- Struct topicData = (Struct) topicDataObj;
- String topic = topicData.getString(TOPIC_KEY_NAME);
- for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
- assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
- }
- errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
- generationId = struct.getInt(GENERATION_ID_KEY_NAME);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
- }
-
- public short errorCode() {
- return errorCode;
- }
-
- public int generationId() {
- return generationId;
- }
-
- public String consumerId() {
- return consumerId;
- }
-
- public List<TopicPartition> assignedPartitions() {
- return assignedPartitions;
- }
-
- public static JoinGroupResponse parse(ByteBuffer buffer) {
- return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
deleted file mode 100644
index 299de92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetRequest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ListOffsetRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
- private static final String REPLICA_ID_KEY_NAME = "replica_id";
- private static final String TOPICS_KEY_NAME = "topics";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String TIMESTAMP_KEY_NAME = "timestamp";
- private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
-
- private final int replicaId;
- private final Map<TopicPartition, PartitionData> offsetData;
-
- public static final class PartitionData {
- public final long timestamp;
- public final int maxNumOffsets;
-
- public PartitionData(long timestamp, int maxNumOffsets) {
- this.timestamp = timestamp;
- this.maxNumOffsets = maxNumOffsets;
- }
- }
-
- public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
- this(-1, offsetData);
- }
-
- public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
-
- struct.set(REPLICA_ID_KEY_NAME, replicaId);
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
- Struct topicData = struct.instance(TOPICS_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
- PartitionData offsetPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
- partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(TOPICS_KEY_NAME, topicArray.toArray());
- this.replicaId = replicaId;
- this.offsetData = offsetData;
- }
-
- public ListOffsetRequest(Struct struct) {
- super(struct);
- replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
- offsetData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
- int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
- PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
- offsetData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
-
- for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
- ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
- responseData.put(entry.getKey(), partitionResponse);
- }
-
- switch (versionId) {
- case 0:
- return new ListOffsetResponse(responseData);
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
- }
- }
-
- public int replicaId() {
- return replicaId;
- }
-
- public Map<TopicPartition, PartitionData> offsetData() {
- return offsetData;
- }
-
- public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
- return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
- }
-
- public static ListOffsetRequest parse(ByteBuffer buffer) {
- return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
deleted file mode 100644
index d7de2df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ListOffsetResponse.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ListOffsetResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
- private static final String RESPONSES_KEY_NAME = "responses";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- /**
- * Possible error code:
- *
- * UNKNOWN_TOPIC_OR_PARTITION (3)
- * NOT_LEADER_FOR_PARTITION (6)
- * UNKNOWN (-1)
- */
-
- private static final String OFFSETS_KEY_NAME = "offsets";
-
- private final Map<TopicPartition, PartitionData> responseData;
-
- public static final class PartitionData {
- public final short errorCode;
- public final List<Long> offsets;
-
- public PartitionData(short errorCode, List<Long> offsets) {
- this.errorCode = errorCode;
- this.offsets = offsets;
- }
- }
-
- public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
- PartitionData offsetPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
- partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
- this.responseData = responseData;
- }
-
- public ListOffsetResponse(Struct struct) {
- super(struct);
- responseData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
- Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
- List<Long> offsetsList = new ArrayList<Long>();
- for (Object offset: offsets)
- offsetsList.add((Long) offset);
- PartitionData partitionData = new PartitionData(errorCode, offsetsList);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
-
- public Map<TopicPartition, PartitionData> responseData() {
- return responseData;
- }
-
- public static ListOffsetResponse parse(ByteBuffer buffer) {
- return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}