You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:00 UTC
[43/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
deleted file mode 100644
index 7a90171..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/MultiSend.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A set of composite sends, sent one after another
- */
-public class MultiSend implements Send {
-
- private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
- private String dest;
- private long totalWritten = 0;
- private List<Send> sends;
- private Iterator<Send> sendsIterator;
- private Send current;
- private boolean doneSends = false;
- private long size = 0;
-
- public MultiSend(String dest, List<Send> sends) {
- this.dest = dest;
- this.sends = sends;
- this.sendsIterator = sends.iterator();
- nextSendOrDone();
- for (Send send: sends)
- this.size += send.size();
- }
-
- @Override
- public long size() {
- return size;
- }
-
- @Override
- public String destination() {
- return dest;
- }
-
- @Override
- public boolean completed() {
- if (doneSends) {
- if (totalWritten != size)
- log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
- if (completed())
- throw new KafkaException("This operation cannot be completed on a complete request.");
-
- int totalWrittenPerCall = 0;
- boolean sendComplete = false;
- do {
- long written = current.writeTo(channel);
- totalWritten += written;
- totalWrittenPerCall += written;
- sendComplete = current.completed();
- if (sendComplete)
- nextSendOrDone();
- } while (!completed() && sendComplete);
- if (log.isTraceEnabled())
- log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size);
- return totalWrittenPerCall;
- }
-
- // update current if there's a next Send, mark sends as done if there isn't
- private void nextSendOrDone() {
- if (sendsIterator.hasNext())
- current = sendsIterator.next();
- else
- doneSends = true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
deleted file mode 100644
index c0d5d99..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkReceive.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkReceive implements Receive {
-
- public final static String UNKNOWN_SOURCE = "";
- public final static int UNLIMITED = -1;
-
- private final String source;
- private final ByteBuffer size;
- private final int maxSize;
- private ByteBuffer buffer;
-
-
- public NetworkReceive(String source, ByteBuffer buffer) {
- this.source = source;
- this.buffer = buffer;
- this.size = null;
- this.maxSize = UNLIMITED;
- }
-
- public NetworkReceive(String source) {
- this.source = source;
- this.size = ByteBuffer.allocate(4);
- this.buffer = null;
- this.maxSize = UNLIMITED;
- }
-
- public NetworkReceive(int maxSize, String source) {
- this.source = source;
- this.size = ByteBuffer.allocate(4);
- this.buffer = null;
- this.maxSize = maxSize;
- }
-
- public NetworkReceive() {
- this(UNKNOWN_SOURCE);
- }
-
- @Override
- public String source() {
- return source;
- }
-
- @Override
- public boolean complete() {
- return !size.hasRemaining() && !buffer.hasRemaining();
- }
-
- public long readFrom(ScatteringByteChannel channel) throws IOException {
- return readFromReadableChannel(channel);
- }
-
- // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
- // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
- // This can go away after we get rid of BlockingChannel
- @Deprecated
- public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
- int read = 0;
- if (size.hasRemaining()) {
- int bytesRead = channel.read(size);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- if (!size.hasRemaining()) {
- size.rewind();
- int receiveSize = size.getInt();
- if (receiveSize < 0)
- throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
- if (maxSize != UNLIMITED && receiveSize > maxSize)
- throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
- this.buffer = ByteBuffer.allocate(receiveSize);
- }
- }
- if (buffer != null) {
- int bytesRead = channel.read(buffer);
- if (bytesRead < 0)
- throw new EOFException();
- read += bytesRead;
- }
-
- return read;
- }
-
- public ByteBuffer payload() {
- return this.buffer;
- }
-
- // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
- @Deprecated
- public long readCompletely(ReadableByteChannel channel) throws IOException {
- int totalRead = 0;
- while (!complete()) {
- totalRead += readFromReadableChannel(channel);
- }
- return totalRead;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
deleted file mode 100644
index 29ce09d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/NetworkSend.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
- */
-public class NetworkSend extends ByteBufferSend {
-
- public NetworkSend(String destination, ByteBuffer... buffers) {
- super(destination, sizeDelimit(buffers));
- }
-
- private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
- int size = 0;
- for (int i = 0; i < buffers.length; i++)
- size += buffers[i].remaining();
- ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
- delimited[0] = ByteBuffer.allocate(4);
- delimited[0].putInt(size);
- delimited[0].rewind();
- System.arraycopy(buffers, 0, delimited, 1, buffers.length);
- return delimited;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
deleted file mode 100644
index b799e7c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Receive.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.nio.channels.ScatteringByteChannel;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This interface models the in-progress reading of data from a channel to a source identified by an integer id
- */
-public interface Receive {
-
- /**
- * The numeric id of the source from which we are receiving data.
- */
- public String source();
-
- /**
- * Are we done receiving data?
- */
- public boolean complete();
-
- /**
- * Read bytes into this receive from the given channel
- * @param channel The channel to read from
- * @return The number of bytes read
- * @throws IOException If the reading fails
- */
- public long readFrom(ScatteringByteChannel channel) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
deleted file mode 100644
index 08da141..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selectable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An interface for asynchronous, multi-channel network I/O
- */
-public interface Selectable {
-
- /**
- * Begin establishing a socket connection to the given address identified by the given address
- * @param id The id for this connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the socket
- * @param receiveBufferSize The receive buffer for the socket
- * @throws IOException If we cannot begin connecting
- */
- public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
-
- /**
- * Begin disconnecting the connection identified by the given id
- */
- public void disconnect(String id);
-
- /**
- * Wakeup this selector if it is blocked on I/O
- */
- public void wakeup();
-
- /**
- * Close this selector
- */
- public void close();
-
- /**
- * Queue the given request for sending in the subsequent {@poll(long)} calls
- * @param send The request to send
- */
- public void send(Send send);
-
- /**
- * Do I/O. Reads, writes, connection establishment, etc.
- * @param timeout The amount of time to block if there is nothing to do
- * @throws IOException
- */
- public void poll(long timeout) throws IOException;
-
- /**
- * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
- */
- public List<Send> completedSends();
-
- /**
- * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
- */
- public List<NetworkReceive> completedReceives();
-
- /**
- * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
- * call.
- */
- public List<String> disconnected();
-
- /**
- * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
- * call.
- */
- public List<String> connected();
-
- /**
- * Disable reads from the given connection
- * @param id The id for the connection
- */
- public void mute(String id);
-
- /**
- * Re-enable reads from the given connection
- * @param id The id for the connection
- */
- public void unmute(String id);
-
- /**
- * Disable reads from all connections
- */
- public void muteAll();
-
- /**
- * Re-enable reads from all connections
- */
- public void unmuteAll();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
deleted file mode 100644
index a886e3b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Selector.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.metrics.Measurable;
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.metrics.Metrics;
-import org.apache.flink.kafka_backport.common.metrics.Sensor;
-import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
-import org.apache.flink.kafka_backport.common.metrics.stats.Count;
-import org.apache.flink.kafka_backport.common.metrics.stats.Max;
-import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
-import org.apache.flink.kafka_backport.common.utils.SystemTime;
-import org.apache.flink.kafka_backport.common.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A nioSelector interface for doing non-blocking multi-connection network I/O.
- * <p>
- * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
- * responses.
- * <p>
- * A connection can be added to the nioSelector associated with an integer id by doing
- *
- * <pre>
- * nioSelector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
- * </pre>
- *
- * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
- * the connection. The successful invocation of this method does not mean a valid connection has been established.
- *
- * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
- * connections are all done using the <code>poll()</code> call.
- *
- * <pre>
- * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * nioSelector.poll(TIMEOUT_MS, requestsToSend);
- * </pre>
- *
- * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
- * various getters. These are reset by each call to <code>poll()</code>.
- *
- * This class is not thread safe!
- */
-public class Selector implements Selectable {
-
- private static final Logger log = LoggerFactory.getLogger(Selector.class);
-
- private final java.nio.channels.Selector nioSelector;
- private final Map<String, SelectionKey> keys;
- private final List<Send> completedSends;
- private final List<NetworkReceive> completedReceives;
- private final List<String> disconnected;
- private final List<String> connected;
- private final List<String> failedSends;
- private final Time time;
- private final SelectorMetrics sensors;
- private final String metricGrpPrefix;
- private final Map<String, String> metricTags;
- private final Map<String, Long> lruConnections;
- private final long connectionsMaxIdleNanos;
- private final int maxReceiveSize;
- private final boolean metricsPerConnection;
- private long currentTimeNanos;
- private long nextIdleCloseCheckTime;
-
-
- /**
- * Create a new nioSelector
- */
- public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
- try {
- this.nioSelector = java.nio.channels.Selector.open();
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- this.maxReceiveSize = maxReceiveSize;
- this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
- this.time = time;
- this.metricGrpPrefix = metricGrpPrefix;
- this.metricTags = metricTags;
- this.keys = new HashMap<String, SelectionKey>();
- this.completedSends = new ArrayList<Send>();
- this.completedReceives = new ArrayList<NetworkReceive>();
- this.connected = new ArrayList<String>();
- this.disconnected = new ArrayList<String>();
- this.failedSends = new ArrayList<String>();
- this.sensors = new SelectorMetrics(metrics);
- // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
- this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
- currentTimeNanos = new SystemTime().nanoseconds();
- nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
- this.metricsPerConnection = metricsPerConnection;
- }
-
- public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
- }
-
- /**
- * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
- * number.
- * <p>
- * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
- * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
- * @param id The id for the new connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the new connection
- * @param receiveBufferSize The receive buffer for the new connection
- * @throws IllegalStateException if there is already a connection for that id
- * @throws IOException if DNS resolution fails on the hostname or if the broker is down
- */
- @Override
- public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
- if (this.keys.containsKey(id))
- throw new IllegalStateException("There is already a connection for id " + id);
-
- SocketChannel channel = SocketChannel.open();
- channel.configureBlocking(false);
- Socket socket = channel.socket();
- socket.setKeepAlive(true);
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
- socket.setTcpNoDelay(true);
- try {
- channel.connect(address);
- } catch (UnresolvedAddressException e) {
- channel.close();
- throw new IOException("Can't resolve address: " + address, e);
- } catch (IOException e) {
- channel.close();
- throw e;
- }
- SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
- key.attach(new Transmissions(id));
- this.keys.put(id, key);
- }
-
- /**
- * Register the nioSelector with an existing channel
- * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
- * Note that we are not checking if the connection id is valid - since the connection already exists
- */
- public void register(String id, SocketChannel channel) throws ClosedChannelException {
- SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
- key.attach(new Transmissions(id));
- this.keys.put(id, key);
- }
-
- /**
- * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be
- * processed until the next {@link #poll(long, List) poll()} call.
- */
- @Override
- public void disconnect(String id) {
- SelectionKey key = this.keys.get(id);
- if (key != null)
- key.cancel();
- }
-
- /**
- * Interrupt the nioSelector if it is blocked waiting to do I/O.
- */
- @Override
- public void wakeup() {
- this.nioSelector.wakeup();
- }
-
- /**
- * Close this selector and all associated connections
- */
- @Override
- public void close() {
- List<String> connections = new LinkedList<String>(keys.keySet());
- for (String id: connections)
- close(id);
-
- try {
- this.nioSelector.close();
- } catch (IOException e) {
- log.error("Exception closing nioSelector:", e);
- }
- }
-
- /**
- * Queue the given request for sending in the subsequent {@poll(long)} calls
- * @param send The request to send
- */
- public void send(Send send) {
- SelectionKey key = keyForId(send.destination());
- Transmissions transmissions = transmissions(key);
- if (transmissions.hasSend())
- throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
- transmissions.send = send;
- try {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } catch (CancelledKeyException e) {
- close(transmissions.id);
- this.failedSends.add(send.destination());
- }
- }
-
- /**
- * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
- * disconnections, initiating new sends, or making progress on in-progress sends or receives.
- *
- * When this call is completed the user can check for completed sends, receives, connections or disconnects using
- * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
- * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any
- * completed I/O.
- *
- * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
- * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
- * already an in-progress send
- */
- @Override
- public void poll(long timeout) throws IOException {
- clear();
-
- /* check ready keys */
- long startSelect = time.nanoseconds();
- int readyKeys = select(timeout);
- long endSelect = time.nanoseconds();
- currentTimeNanos = endSelect;
- this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
-
- if (readyKeys > 0) {
- Set<SelectionKey> keys = this.nioSelector.selectedKeys();
- Iterator<SelectionKey> iter = keys.iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
-
- Transmissions transmissions = transmissions(key);
- SocketChannel channel = channel(key);
-
- // register all per-connection metrics at once
- sensors.maybeRegisterConnectionMetrics(transmissions.id);
- lruConnections.put(transmissions.id, currentTimeNanos);
-
- try {
- /* complete any connections that have finished their handshake */
- if (key.isConnectable()) {
- channel.finishConnect();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- this.connected.add(transmissions.id);
- this.sensors.connectionCreated.record();
- log.debug("Connection {} created", transmissions.id);
- }
-
- /* read from any connections that have readable data */
- if (key.isReadable()) {
- if (!transmissions.hasReceive())
- transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
- try {
- transmissions.receive.readFrom(channel);
- } catch (InvalidReceiveException e) {
- log.error("Invalid data received from " + transmissions.id + " closing connection", e);
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
- throw e;
- }
- if (transmissions.receive.complete()) {
- transmissions.receive.payload().rewind();
- this.completedReceives.add(transmissions.receive);
- this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
- transmissions.clearReceive();
- }
- }
-
- /* write to any sockets that have space in their buffer and for which we have data */
- if (key.isWritable()) {
- transmissions.send.writeTo(channel);
- if (transmissions.send.completed()) {
- this.completedSends.add(transmissions.send);
- this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
- transmissions.clearSend();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- }
-
- /* cancel any defunct sockets */
- if (!key.isValid()) {
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
- }
- } catch (IOException e) {
- String desc = socketDescription(channel);
- if (e instanceof EOFException || e instanceof ConnectException)
- log.debug("Connection {} disconnected", desc);
- else
- log.warn("Error in I/O with connection to {}", desc, e);
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
- }
- }
- }
- long endIo = time.nanoseconds();
- this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
- maybeCloseOldestConnection();
- }
-
- private String socketDescription(SocketChannel channel) {
- Socket socket = channel.socket();
- if (socket == null)
- return "[unconnected socket]";
- else if (socket.getInetAddress() != null)
- return socket.getInetAddress().toString();
- else
- return socket.getLocalAddress().toString();
- }
-
- @Override
- public List<Send> completedSends() {
- return this.completedSends;
- }
-
- @Override
- public List<NetworkReceive> completedReceives() {
- return this.completedReceives;
- }
-
- @Override
- public List<String> disconnected() {
- return this.disconnected;
- }
-
- @Override
- public List<String> connected() {
- return this.connected;
- }
-
- @Override
- public void mute(String id) {
- mute(this.keyForId(id));
- }
-
- private void mute(SelectionKey key) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
- }
-
- @Override
- public void unmute(String id) {
- unmute(this.keyForId(id));
- }
-
- private void unmute(SelectionKey key) {
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- }
-
- @Override
- public void muteAll() {
- for (SelectionKey key : this.keys.values())
- mute(key);
- }
-
- @Override
- public void unmuteAll() {
- for (SelectionKey key : this.keys.values())
- unmute(key);
- }
-
- private void maybeCloseOldestConnection() {
- if (currentTimeNanos > nextIdleCloseCheckTime) {
- if (lruConnections.isEmpty()) {
- nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
- } else {
- Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
- Long connectionLastActiveTime = oldestConnectionEntry.getValue();
- nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
- if (currentTimeNanos > nextIdleCloseCheckTime) {
- String connectionId = oldestConnectionEntry.getKey();
- if (log.isTraceEnabled())
- log.trace("About to close the idle connection from " + connectionId
- + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
-
- disconnected.add(connectionId);
- close(connectionId);
- }
- }
- }
- }
-
- /**
- * Clear the results from the prior poll
- */
- private void clear() {
- this.completedSends.clear();
- this.completedReceives.clear();
- this.connected.clear();
- this.disconnected.clear();
- this.disconnected.addAll(this.failedSends);
- this.failedSends.clear();
- }
-
- /**
- * Check for data, waiting up to the given timeout.
- *
- * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
- * @return The number of keys ready
- * @throws IOException
- */
- private int select(long ms) throws IOException {
- if (ms == 0L)
- return this.nioSelector.selectNow();
- else if (ms < 0L)
- return this.nioSelector.select();
- else
- return this.nioSelector.select(ms);
- }
-
- /**
- * Begin closing this connection
- */
- public void close(String id) {
- SelectionKey key = keyForId(id);
- lruConnections.remove(id);
- SocketChannel channel = channel(key);
- Transmissions trans = transmissions(key);
- if (trans != null) {
- this.keys.remove(trans.id);
- trans.clearReceive();
- trans.clearSend();
- }
- key.attach(null);
- key.cancel();
- try {
- channel.socket().close();
- channel.close();
- } catch (IOException e) {
- log.error("Exception closing connection to node {}:", trans.id, e);
- }
- this.sensors.connectionClosed.record();
- }
-
- /**
- * Get the selection key associated with this numeric id
- */
- private SelectionKey keyForId(String id) {
- SelectionKey key = this.keys.get(id);
- if (key == null)
- throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
- return key;
- }
-
- /**
- * Get the transmissions for the given connection
- */
- private Transmissions transmissions(SelectionKey key) {
- return (Transmissions) key.attachment();
- }
-
- /**
- * Get the socket channel associated with this selection key
- */
- private SocketChannel channel(SelectionKey key) {
- return (SocketChannel) key.channel();
- }
-
- /**
- * The id and in-progress send and receive associated with a connection
- */
- private static class Transmissions {
- public String id;
- public Send send;
- public NetworkReceive receive;
-
- public Transmissions(String id) {
- this.id = id;
- }
-
- public boolean hasSend() {
- return this.send != null;
- }
-
- public void clearSend() {
- this.send = null;
- }
-
- public boolean hasReceive() {
- return this.receive != null;
- }
-
- public void clearReceive() {
- this.receive = null;
- }
- }
-
- private class SelectorMetrics {
- private final Metrics metrics;
- public final Sensor connectionClosed;
- public final Sensor connectionCreated;
- public final Sensor bytesTransferred;
- public final Sensor bytesSent;
- public final Sensor bytesReceived;
- public final Sensor selectTime;
- public final Sensor ioTime;
-
- public SelectorMetrics(Metrics metrics) {
- this.metrics = metrics;
- String metricGrpName = metricGrpPrefix + "-metrics";
- StringBuilder tagsSuffix = new StringBuilder();
-
- for (Map.Entry<String, String> tag: metricTags.entrySet()) {
- tagsSuffix.append(tag.getKey());
- tagsSuffix.append("-");
- tagsSuffix.append(tag.getValue());
- }
-
- this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
- MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
- this.connectionClosed.add(metricName, new Rate());
-
- this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
- metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
- this.connectionCreated.add(metricName, new Rate());
-
- this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
- metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
- bytesTransferred.add(metricName, new Rate(new Count()));
-
- this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
- metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
- this.bytesSent.add(metricName, new Rate());
- metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
- this.bytesSent.add(metricName, new Rate(new Count()));
- metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
- this.bytesSent.add(metricName, new Avg());
- metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
- this.bytesSent.add(metricName, new Max());
-
- this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
- metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
- this.bytesReceived.add(metricName, new Rate());
- metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
- this.bytesReceived.add(metricName, new Rate(new Count()));
-
- this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
- metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
- this.selectTime.add(metricName, new Rate(new Count()));
- metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
- this.selectTime.add(metricName, new Avg());
- metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
- this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
- this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
- metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
- this.ioTime.add(metricName, new Avg());
- metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
- this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
- metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
- this.metrics.addMetric(metricName, new Measurable() {
- public double measure(MetricConfig config, long now) {
- return keys.size();
- }
- });
- }
-
- public void maybeRegisterConnectionMetrics(String connectionId) {
- if (!connectionId.isEmpty() && metricsPerConnection) {
- // if one sensor of the metrics has been registered for the connection,
- // then all other sensors should have been registered; and vice versa
- String nodeRequestName = "node-" + connectionId + ".bytes-sent";
- Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
- if (nodeRequest == null) {
- String metricGrpName = metricGrpPrefix + "-node-metrics";
-
- Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
- tags.put("node-id", "node-" + connectionId);
-
- nodeRequest = this.metrics.sensor(nodeRequestName);
- MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
- nodeRequest.add(metricName, new Rate());
- metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
- nodeRequest.add(metricName, new Rate(new Count()));
- metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
- nodeRequest.add(metricName, new Avg());
- metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
- nodeRequest.add(metricName, new Max());
-
- String nodeResponseName = "node-" + connectionId + ".bytes-received";
- Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
- metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
- nodeResponse.add(metricName, new Rate());
- metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
- nodeResponse.add(metricName, new Rate(new Count()));
-
- String nodeTimeName = "node-" + connectionId + ".latency";
- Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
- metricName = new MetricName("request-latency-avg", metricGrpName, tags);
- nodeRequestTime.add(metricName, new Avg());
- metricName = new MetricName("request-latency-max", metricGrpName, tags);
- nodeRequestTime.add(metricName, new Max());
- }
- }
- }
-
- public void recordBytesSent(String connectionId, long bytes) {
- long now = time.milliseconds();
- this.bytesSent.record(bytes, now);
- if (!connectionId.isEmpty()) {
- String nodeRequestName = "node-" + connectionId + ".bytes-sent";
- Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
- if (nodeRequest != null)
- nodeRequest.record(bytes, now);
- }
- }
-
- public void recordBytesReceived(String connection, int bytes) {
- long now = time.milliseconds();
- this.bytesReceived.record(bytes, now);
- if (!connection.isEmpty()) {
- String nodeRequestName = "node-" + connection + ".bytes-received";
- Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
- if (nodeRequest != null)
- nodeRequest.record(bytes, now);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
deleted file mode 100644
index b9e8a50..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/network/Send.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.network;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This interface models the in-progress sending of data to a destination identified by an integer id.
- */
-public interface Send {
-
- /**
- * The numeric id for the destination of this send
- */
- public String destination();
-
- /**
- * Is this send complete?
- */
- public boolean completed();
-
- /**
- * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
- * to be completely written
- * @param channel The channel to write to
- * @return The number of bytes written
- * @throws IOException If the write fails
- */
- public long writeTo(GatheringByteChannel channel) throws IOException;
-
- /**
- * Size of the send
- */
- public long size();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
deleted file mode 100644
index e12261c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ApiKeys.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Identifiers for all the Kafka APIs
- */
-public enum ApiKeys {
- PRODUCE(0, "Produce"),
- FETCH(1, "Fetch"),
- LIST_OFFSETS(2, "Offsets"),
- METADATA(3, "Metadata"),
- LEADER_AND_ISR(4, "LeaderAndIsr"),
- STOP_REPLICA(5, "StopReplica"),
- UPDATE_METADATA_KEY(6, "UpdateMetadata"),
- CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
- OFFSET_COMMIT(8, "OffsetCommit"),
- OFFSET_FETCH(9, "OffsetFetch"),
- CONSUMER_METADATA(10, "ConsumerMetadata"),
- JOIN_GROUP(11, "JoinGroup"),
- HEARTBEAT(12, "Heartbeat");
-
- private static ApiKeys[] codeToType;
- public static final int MAX_API_KEY;
-
- static {
- int maxKey = -1;
- for (ApiKeys key : ApiKeys.values()) {
- maxKey = Math.max(maxKey, key.id);
- }
- codeToType = new ApiKeys[maxKey + 1];
- for (ApiKeys key : ApiKeys.values()) {
- codeToType[key.id] = key;
- }
- MAX_API_KEY = maxKey;
- }
-
- /** the perminant and immutable id of an API--this can't change ever */
- public final short id;
-
- /** an english description of the api--this is for debugging and can change */
- public final String name;
-
- private ApiKeys(int id, String name) {
- this.id = (short) id;
- this.name = name;
- }
-
- public static ApiKeys forId(int id) {
- return codeToType[id];
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
deleted file mode 100644
index 5ef3b24..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Errors.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import org.apache.flink.kafka_backport.common.errors.ApiException;
-import org.apache.flink.kafka_backport.common.errors.IllegalGenerationException;
-import org.apache.flink.kafka_backport.common.errors.InvalidRequiredAcksException;
-import org.apache.flink.kafka_backport.common.errors.InvalidTopicException;
-import org.apache.flink.kafka_backport.common.errors.LeaderNotAvailableException;
-import org.apache.flink.kafka_backport.common.errors.NetworkException;
-import org.apache.flink.kafka_backport.common.errors.NotCoordinatorForConsumerException;
-import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasException;
-import org.apache.flink.kafka_backport.common.errors.NotLeaderForPartitionException;
-import org.apache.flink.kafka_backport.common.errors.OffsetLoadInProgressException;
-import org.apache.flink.kafka_backport.common.errors.OffsetMetadataTooLarge;
-import org.apache.flink.kafka_backport.common.errors.OffsetOutOfRangeException;
-import org.apache.flink.kafka_backport.common.errors.RecordBatchTooLargeException;
-import org.apache.flink.kafka_backport.common.errors.TimeoutException;
-import org.apache.flink.kafka_backport.common.errors.UnknownConsumerIdException;
-import org.apache.flink.kafka_backport.common.errors.UnknownServerException;
-import org.apache.flink.kafka_backport.common.errors.UnknownTopicOrPartitionException;
-import org.apache.flink.kafka_backport.common.errors.ConsumerCoordinatorNotAvailableException;
-import org.apache.flink.kafka_backport.common.errors.CorruptRecordException;
-import org.apache.flink.kafka_backport.common.errors.NotEnoughReplicasAfterAppendException;
-import org.apache.flink.kafka_backport.common.errors.RecordTooLargeException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
- * are thus part of the protocol. The names can be changed but the error code cannot.
- *
- * Do not add exceptions that occur only on the client or only on the server here.
- */
-public enum Errors {
- UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
- NONE(0, null),
- OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2,
- new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
- UNKNOWN_TOPIC_OR_PARTITION(3,
- new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- // TODO: errorCode 4 for InvalidFetchSize
- LEADER_NOT_AVAILABLE(5,
- new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
- NOT_LEADER_FOR_PARTITION(6,
- new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
- REQUEST_TIMED_OUT(7,
- new TimeoutException("The request timed out.")),
- // TODO: errorCode 8 for BrokerNotAvailable
- REPLICA_NOT_AVAILABLE(9,
- new ApiException("The replica is not available for the requested topic-partition")),
- MESSAGE_TOO_LARGE(10,
- new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
- OFFSET_METADATA_TOO_LARGE(12,
- new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
- NETWORK_EXCEPTION(13,
- new NetworkException("The server disconnected before a response was received.")),
- OFFSET_LOAD_IN_PROGRESS(14,
- new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
- CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
- new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
- NOT_COORDINATOR_FOR_CONSUMER(16,
- new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
- INVALID_TOPIC_EXCEPTION(17,
- new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
- RECORD_LIST_TOO_LARGE(18,
- new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
- NOT_ENOUGH_REPLICAS(19,
- new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
- NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
- new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
- INVALID_REQUIRED_ACKS(21,
- new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
- ILLEGAL_GENERATION(22,
- new IllegalGenerationException("Specified consumer generation id is not valid.")),
- INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
- new ApiException("The request partition assignment strategy does not match that of the group.")),
- UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
- new ApiException("The request partition assignment strategy is unknown to the broker.")),
- UNKNOWN_CONSUMER_ID(25,
- new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
- INVALID_SESSION_TIMEOUT(26,
- new ApiException("The session timeout is not within an acceptable range.")),
- COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
- new ApiException("Some of the committing partitions are not assigned the committer")),
- INVALID_COMMIT_OFFSET_SIZE(28,
- new ApiException("The committing offset data size is not valid"));
-
- private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
- private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
-
- static {
- for (Errors error : Errors.values()) {
- codeToError.put(error.code(), error);
- if (error.exception != null)
- classToError.put(error.exception.getClass(), error);
- }
- }
-
- private final short code;
- private final ApiException exception;
-
- private Errors(int code, ApiException exception) {
- this.code = (short) code;
- this.exception = exception;
- }
-
- /**
- * An instance of the exception
- */
- public ApiException exception() {
- return this.exception;
- }
-
- /**
- * The error code for the exception
- */
- public short code() {
- return this.code;
- }
-
- /**
- * Throw the exception corresponding to this error if there is one
- */
- public void maybeThrow() {
- if (exception != null) {
- throw this.exception;
- }
- }
-
- /**
- * Throw the exception if there is one
- */
- public static Errors forCode(short code) {
- Errors error = codeToError.get(code);
- return error == null ? UNKNOWN : error;
- }
-
- /**
- * Return the error instance associated with this exception (or UKNOWN if there is none)
- */
- public static Errors forException(Throwable t) {
- Errors error = classToError.get(t.getClass());
- return error == null ? UNKNOWN : error;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
deleted file mode 100644
index e5ae9a4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/ProtoUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ProtoUtils {
-
- private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
- if (apiKey < 0 || apiKey > schemas.length)
- throw new IllegalArgumentException("Invalid api key: " + apiKey);
- Schema[] versions = schemas[apiKey];
- if (version < 0 || version > versions.length)
- throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
- return versions[version];
- }
-
- public static short latestVersion(int apiKey) {
- if (apiKey < 0 || apiKey >= Protocol.CURR_VERSION.length)
- throw new IllegalArgumentException("Invalid api key: " + apiKey);
- return Protocol.CURR_VERSION[apiKey];
- }
-
- public static Schema requestSchema(int apiKey, int version) {
- return schemaFor(Protocol.REQUESTS, apiKey, version);
- }
-
- public static Schema currentRequestSchema(int apiKey) {
- return requestSchema(apiKey, latestVersion(apiKey));
- }
-
- public static Schema responseSchema(int apiKey, int version) {
- return schemaFor(Protocol.RESPONSES, apiKey, version);
- }
-
- public static Schema currentResponseSchema(int apiKey) {
- return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
- }
-
- public static Struct parseRequest(int apiKey, int version, ByteBuffer buffer) {
- return (Struct) requestSchema(apiKey, version).read(buffer);
- }
-
- public static Struct parseResponse(int apiKey, ByteBuffer buffer) {
- return (Struct) currentResponseSchema(apiKey).read(buffer);
- }
-
-}