You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/19 06:53:37 UTC
[3/4] kafka git commit: kafka-1690;
Add SSL support to Kafka Broker, Producer and Consumer;
patched by Sriharsha Chintalapani;
reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin,
Jiangjie Qin and Jun Rao
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
new file mode 100644
index 0000000..a3567af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for PLAINTEXT communication
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PlaintextTransportLayer implements TransportLayer {
+ private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class);
+ private final SelectionKey key;
+ private final SocketChannel socketChannel;
+ private final Principal principal = new KafkaPrincipal("ANONYMOUS");
+
+ public PlaintextTransportLayer(SelectionKey key) throws IOException {
+ this.key = key;
+ this.socketChannel = (SocketChannel) key.channel();
+ }
+
+ @Override
+ public boolean ready() {
+ return true;
+ }
+
+ @Override
+ public void finishConnect() throws IOException {
+ socketChannel.finishConnect();
+ int ops = key.interestOps();
+ ops &= ~SelectionKey.OP_CONNECT;
+ ops |= SelectionKey.OP_READ;
+ key.interestOps(ops);
+ }
+
+ @Override
+ public void disconnect() {
+ key.cancel();
+ }
+
+ @Override
+ public SocketChannel socketChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return socketChannel.isOpen();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return socketChannel.isConnected();
+ }
+
+ /**
+ * Closes this channel
+ *
+ * @throws IOException If I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ socketChannel.socket().close();
+ socketChannel.close();
+ key.attach(null);
+ key.cancel();
+ }
+
+ /**
+ * Performs SSL handshake hence is a no-op for the non-secure
+ * implementation
+ * @throws IOException
+ */
+ @Override
+ public void handshake() throws IOException {}
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffer.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return socketChannel.read(dst);
+ }
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffers.
+ *
+ * @param dsts - The buffers into which bytes are to be transferred.
+ * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public long read(ByteBuffer[] dsts) throws IOException {
+ return socketChannel.read(dsts);
+ }
+
+ /**
+ * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+ * @param dsts - The buffers into which bytes are to be transferred
+ * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+ * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+ * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ return socketChannel.read(dsts, offset, length);
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the given buffer.
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return socketChannel.write(src);
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the given buffer.
+ *
+ * @param srcs The buffer from which bytes are to be retrieved
+ * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException {
+ return socketChannel.write(srcs);
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+ *
+ * @param srcs The buffers from which bytes are to be retrieved
+ * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+ * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+ * @return returns no.of bytes written , possibly zero.
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ return socketChannel.write(srcs, offset, length);
+ }
+
+ /**
+ * always returns false as there will be not be any
+ * pending writes since we directly write to socketChannel.
+ */
+ @Override
+ public boolean hasPendingWrites() {
+ return false;
+ }
+
+ /**
+ * Returns ANONYMOUS as Principal.
+ */
+ @Override
+ public Principal peerPrincipal() throws IOException {
+ return principal;
+ }
+
+ /**
+ * Adds the interestOps to selectionKey.
+ * @param interestOps
+ */
+ @Override
+ public void addInterestOps(int ops) {
+ key.interestOps(key.interestOps() | ops);
+
+ }
+
+ /**
+ * Removes the interestOps from selectionKey.
+ * @param interestOps
+ */
+ @Override
+ public void removeInterestOps(int ops) {
+ key.interestOps(key.interestOps() & ~ops);
+ }
+
+ @Override
+ public boolean isMute() {
+ return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
new file mode 100644
index 0000000..88c218b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLChannelBuilder implements ChannelBuilder {
+ private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
+ private SSLFactory sslFactory;
+ private PrincipalBuilder principalBuilder;
+ private SSLFactory.Mode mode;
+
+ public SSLChannelBuilder(SSLFactory.Mode mode) {
+ this.mode = mode;
+ }
+
+ public void configure(Map<String, ?> configs) throws KafkaException {
+ try {
+ this.sslFactory = new SSLFactory(mode);
+ this.sslFactory.configure(configs);
+ this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+ this.principalBuilder.configure(configs);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+ KafkaChannel channel = null;
+ try {
+ SocketChannel socketChannel = (SocketChannel) key.channel();
+ SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
+ sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+ socketChannel.socket().getPort()));
+ Authenticator authenticator = new DefaultAuthenticator();
+ authenticator.configure(transportLayer, this.principalBuilder);
+ channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ } catch (Exception e) {
+ log.info("Failed to create channel due to ", e);
+ throw new KafkaException(e);
+ }
+ return channel;
+ }
+
+ public void close() {
+ this.principalBuilder.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
new file mode 100644
index 0000000..8ba1b01
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.CancelledKeyException;
+
+import java.security.Principal;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Transport layer for SSL communication
+ */
+
+public class SSLTransportLayer implements TransportLayer {
+ private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
+ private final String channelId;
+ protected final SSLEngine sslEngine;
+ private final SelectionKey key;
+ private final SocketChannel socketChannel;
+ private HandshakeStatus handshakeStatus;
+ private SSLEngineResult handshakeResult;
+ private boolean handshakeComplete = false;
+ private boolean closing = false;
+ private ByteBuffer netReadBuffer;
+ private ByteBuffer netWriteBuffer;
+ private ByteBuffer appReadBuffer;
+ private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+ public SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+ this.channelId = channelId;
+ this.key = key;
+ this.socketChannel = (SocketChannel) key.channel();
+ this.sslEngine = sslEngine;
+ this.netReadBuffer = ByteBuffer.allocate(packetBufferSize());
+ this.netWriteBuffer = ByteBuffer.allocate(packetBufferSize());
+ this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
+ startHandshake();
+ }
+
+ /**
+ * starts sslEngine handshake process
+ */
+ private void startHandshake() throws IOException {
+ //clear & set netRead & netWrite buffers
+ netWriteBuffer.position(0);
+ netWriteBuffer.limit(0);
+ netReadBuffer.position(0);
+ netReadBuffer.limit(0);
+ handshakeComplete = false;
+ closing = false;
+ //initiate handshake
+ sslEngine.beginHandshake();
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ }
+
+ @Override
+ public boolean ready() {
+ return handshakeComplete;
+ }
+
+ /**
+ * does socketChannel.finishConnect()
+ */
+ @Override
+ public void finishConnect() throws IOException {
+ socketChannel.finishConnect();
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+ }
+
+ /**
+ * disconnects selectionKey.
+ */
+ @Override
+ public void disconnect() {
+ key.cancel();
+ }
+
+ @Override
+ public SocketChannel socketChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return socketChannel.isOpen();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return socketChannel.isConnected();
+ }
+
+
+ /**
+ * Sends a SSL close message and closes socketChannel.
+ * @throws IOException if an I/O error occurs
+ * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+ */
+ @Override
+ public void close() throws IOException {
+ if (closing) return;
+ closing = true;
+ sslEngine.closeOutbound();
+ try {
+ if (!flush(netWriteBuffer)) {
+ throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
+ }
+ //prep the buffer for the close message
+ netWriteBuffer.clear();
+ //perform the close, since we called sslEngine.closeOutbound
+ SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
+ //we should be in a close state
+ if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+ throw new IOException("Invalid close state, will not send network data.");
+ }
+ netWriteBuffer.flip();
+ flush(netWriteBuffer);
+ socketChannel.socket().close();
+ socketChannel.close();
+ } catch (IOException ie) {
+ log.warn("Failed to send SSL Close message ", ie);
+ }
+ key.attach(null);
+ key.cancel();
+ }
+
+ /**
+ * returns true if there are any pending contents in netWriteBuffer
+ */
+ @Override
+ public boolean hasPendingWrites() {
+ return netWriteBuffer.hasRemaining();
+ }
+
+ /**
+ * Flushes the buffer to the network, non blocking
+ * @param buf ByteBuffer
+ * @return boolean true if the buffer has been emptied out, false otherwise
+ * @throws IOException
+ */
+ private boolean flush(ByteBuffer buf) throws IOException {
+ int remaining = buf.remaining();
+ if (remaining > 0) {
+ int written = socketChannel.write(buf);
+ return written >= remaining;
+ }
+ return true;
+ }
+
+ /**
+ * Performs SSL handshake, non blocking.
+ * Before application data (kafka protocols) can be sent client & kafka broker must
+ * perform ssl handshake.
+ * During the handshake SSLEngine generates encrypted data that will be transported over socketChannel.
+ * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
+ * determine what operation needs to occur to move handshake along.
+ * A typical handshake might look like this.
+ * +-------------+----------------------------------+-------------+
+ * | client | SSL/TLS message | HSStatus |
+ * +-------------+----------------------------------+-------------+
+ * | wrap() | ClientHello | NEED_UNWRAP |
+ * | unwrap() | ServerHello/Cert/ServerHelloDone | NEED_WRAP |
+ * | wrap() | ClientKeyExchange | NEED_WRAP |
+ * | wrap() | ChangeCipherSpec | NEED_WRAP |
+ * | wrap() | Finished | NEED_UNWRAP |
+ * | unwrap() | ChangeCipherSpec | NEED_UNWRAP |
+ * | unwrap() | Finished | FINISHED |
+ * +-------------+----------------------------------+-------------+
+ *
+ * @throws IOException
+ */
+ @Override
+ public void handshake() throws IOException {
+ boolean read = key.isReadable();
+ boolean write = key.isWritable();
+ handshakeComplete = false;
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ if (!flush(netWriteBuffer)) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ return;
+ }
+ try {
+ switch (handshakeStatus) {
+ case NEED_TASK:
+ log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ handshakeStatus = runDelegatedTasks();
+ break;
+ case NEED_WRAP:
+ log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ handshakeResult = handshakeWrap(write);
+ if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ int currentPacketBufferSize = packetBufferSize();
+ netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentPacketBufferSize);
+ if (netWriteBuffer.position() >= currentPacketBufferSize) {
+ throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.position() +
+ ") >= network buffer size (" + currentPacketBufferSize + ")");
+ }
+ } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
+ } else if (handshakeResult.getStatus() == Status.CLOSED) {
+ throw new EOFException();
+ }
+ log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
+ //we will break here otherwise we can do need_unwrap in the same call.
+ if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ }
+ case NEED_UNWRAP:
+ log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ handshakeResult = handshakeUnwrap(read);
+ if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ int currentPacketBufferSize = packetBufferSize();
+ netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+ if (netReadBuffer.position() >= currentPacketBufferSize) {
+ throw new IllegalStateException("Buffer underflow when there is available data");
+ }
+ } else if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ int currentAppBufferSize = applicationBufferSize();
+ appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
+ if (appReadBuffer.position() > currentAppBufferSize) {
+ throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
+ ") > packet buffer size (" + currentAppBufferSize + ")");
+ }
+ } else if (handshakeResult.getStatus() == Status.CLOSED) {
+ throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
+ }
+ log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+
+ //if handshakeStatus completed than fall-through to finished status.
+ //after handshake is finished there is no data left to read/write in socketChannel.
+ //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
+ if (handshakeStatus != HandshakeStatus.FINISHED) {
+ if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ }
+ break;
+ }
+ case FINISHED:
+ handshakeFinished();
+ break;
+ case NOT_HANDSHAKING:
+ handshakeFinished();
+ break;
+ default:
+ throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
+ }
+ } catch (SSLException e) {
+ handshakeFailure();
+ throw e;
+ }
+ }
+
+
+ /**
+ * Executes the SSLEngine tasks needed.
+ * @return HandshakeStatus
+ */
+ private HandshakeStatus runDelegatedTasks() {
+ for (;;) {
+ Runnable task = delegatedTask();
+ if (task == null) {
+ break;
+ }
+ task.run();
+ }
+ return sslEngine.getHandshakeStatus();
+ }
+
+ /**
+ * Checks if the handshake status is finished
+ * Sets the interestOps for the selectionKey.
+ */
+ private void handshakeFinished() throws IOException {
+ // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
+ // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
+ // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
+ if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+ //we are complete if we have delivered the last package
+ handshakeComplete = !netWriteBuffer.hasRemaining();
+ //remove OP_WRITE if we are complete, otherwise we still have data to write
+ if (!handshakeComplete)
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ else
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+ log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
+ channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ } else {
+ throw new IOException("NOT_HANDSHAKING during handshake");
+ }
+ }
+
+ /**
+ * Performs the WRAP function
+ * @param doWrite boolean
+ * @return SSLEngineResult
+ * @throws IOException
+ */
+ private SSLEngineResult handshakeWrap(Boolean doWrite) throws IOException {
+ log.trace("SSLHandshake handshakeWrap", channelId);
+ if (netWriteBuffer.hasRemaining())
+ throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
+ //this should never be called with a network buffer that contains data
+ //so we can clear it here.
+ netWriteBuffer.clear();
+ SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
+ //prepare the results to be written
+ netWriteBuffer.flip();
+ handshakeStatus = result.getHandshakeStatus();
+ if (result.getStatus() == SSLEngineResult.Status.OK &&
+ result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ handshakeStatus = runDelegatedTasks();
+ }
+
+ if (doWrite) flush(netWriteBuffer);
+ return result;
+ }
+
+ /**
+ * Perform handshake unwrap
+ * @param doRead boolean
+ * @return SSLEngineResult
+ * @throws IOException
+ */
+ private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException {
+ log.trace("SSLHandshake handshakeUnwrap", channelId);
+ SSLEngineResult result;
+ boolean cont = false;
+ int read = 0;
+ if (doRead) {
+ read = socketChannel.read(netReadBuffer);
+ if (read == -1) throw new EOFException("EOF during handshake.");
+ }
+ do {
+ //prepare the buffer with the incoming data
+ netReadBuffer.flip();
+ result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+ netReadBuffer.compact();
+ handshakeStatus = result.getHandshakeStatus();
+ if (result.getStatus() == SSLEngineResult.Status.OK &&
+ result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ handshakeStatus = runDelegatedTasks();
+ }
+ cont = result.getStatus() == SSLEngineResult.Status.OK &&
+ handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+ log.trace("SSLHandshake handshakeUnwrap: handshakeStatus ", handshakeStatus);
+ } while (netReadBuffer.position() != 0 && cont);
+
+ return result;
+ }
+
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffer.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (closing) return -1;
+ int read = 0;
+ if (!handshakeComplete) return read;
+
+ //if we have unread decrypted data in appReadBuffer read that into dst buffer.
+ if (appReadBuffer.position() > 0) {
+ read = readFromAppBuffer(dst);
+ }
+
+ if (dst.remaining() > 0) {
+ netReadBuffer = Utils.ensureCapacity(netReadBuffer, packetBufferSize());
+ if (netReadBuffer.remaining() > 0) {
+ int netread = socketChannel.read(netReadBuffer);
+ if (netread == 0) return netread;
+ else if (netread < 0) throw new EOFException("EOF during read");
+ }
+ do {
+ netReadBuffer.flip();
+ SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+ netReadBuffer.compact();
+ // handle ssl renegotiation.
+ if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+ log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+ channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+ handshake();
+ break;
+ }
+
+ if (unwrapResult.getStatus() == Status.OK) {
+ read += readFromAppBuffer(dst);
+ } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ int currentApplicationBufferSize = applicationBufferSize();
+ appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
+ if (appReadBuffer.position() >= currentApplicationBufferSize) {
+ throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
+ ") >= application buffer size (" + currentApplicationBufferSize + ")");
+ }
+
+ // appReadBuffer will extended upto currentApplicationBufferSize
+ // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
+ // we can break here.
+ if (dst.hasRemaining())
+ read += readFromAppBuffer(dst);
+ else
+ break;
+ } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ int currentPacketBufferSize = packetBufferSize();
+ netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+ if (netReadBuffer.position() >= currentPacketBufferSize) {
+ throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
+ ") > packet buffer size (" + currentPacketBufferSize + ")");
+ }
+ break;
+ } else if (unwrapResult.getStatus() == Status.CLOSED) {
+ throw new EOFException();
+ }
+ } while (netReadBuffer.position() != 0);
+ }
+ return read;
+ }
+
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffers.
+ *
+ * @param dsts - The buffers into which bytes are to be transferred.
+ * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public long read(ByteBuffer[] dsts) throws IOException {
+ return read(dsts, 0, dsts.length);
+ }
+
+
+ /**
+ * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+ * @param dsts - The buffers into which bytes are to be transferred
+ * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+ * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+ * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+ * @throws IOException if some other I/O error occurs
+ */
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+ throw new IndexOutOfBoundsException();
+
+ int totalRead = 0;
+ int i = offset;
+ while (i < length) {
+ if (dsts[i].hasRemaining()) {
+ int read = read(dsts[i]);
+ if (read > 0)
+ totalRead += read;
+ else
+ break;
+ }
+ if (!dsts[i].hasRemaining()) {
+ i++;
+ }
+ }
+ return totalRead;
+ }
+
+
+ /**
+ * Writes a sequence of bytes to this channel from the given buffer.
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int written = 0;
+ if (closing) throw new IllegalStateException("Channel is in closing state");
+ if (!handshakeComplete) return written;
+
+ if (!flush(netWriteBuffer))
+ return written;
+
+ netWriteBuffer.clear();
+ SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
+ netWriteBuffer.flip();
+
+ //handle ssl renegotiation
+ if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+ handshake();
+ return written;
+ }
+
+ if (wrapResult.getStatus() == Status.OK) {
+ written = wrapResult.bytesConsumed();
+ flush(netWriteBuffer);
+ } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ int currentPacketBufferSize = packetBufferSize();
+ netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, packetBufferSize());
+ if (netWriteBuffer.position() >= currentPacketBufferSize)
+ throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.position() + ") >= network buffer size (" + currentPacketBufferSize + ")");
+ } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
+ } else if (wrapResult.getStatus() == Status.CLOSED) {
+ throw new EOFException();
+ }
+ return written;
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+ *
+ * @param srcs The buffers from which bytes are to be retrieved
+ * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+ * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+ * @return returns no.of bytes written , possibly zero.
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+ throw new IndexOutOfBoundsException();
+ int totalWritten = 0;
+ int i = offset;
+ while (i < length) {
+ if (srcs[i].hasRemaining() || hasPendingWrites()) {
+ int written = write(srcs[i]);
+ if (written > 0) {
+ totalWritten += written;
+ }
+ }
+ if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
+ i++;
+ } else {
+ // if we are unable to write the current buffer to socketChannel we should break,
+ // as we might have reached max socket send buffer size.
+ break;
+ }
+ }
+ return totalWritten;
+ }
+
+ /**
+ * Writes a sequence of bytes to this channel from the given buffers.
+ *
+ * @param srcs The buffers from which bytes are to be retrieved
+ * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException {
+ return write(srcs, 0, srcs.length);
+ }
+
+
+ /**
+ * SSLSession's peerPrincipal for the remote host.
+ * @return Principal
+ */
+ public Principal peerPrincipal() throws IOException {
+ try {
+ return sslEngine.getSession().getPeerPrincipal();
+ } catch (SSLPeerUnverifiedException se) {
+ throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se));
+ }
+ }
+
+ /**
+ * returns a SSL Session after the handshake is established
+ * throws IllegalStateException if the handshake is not established
+ */
+ public SSLSession sslSession() throws IllegalStateException {
+ return sslEngine.getSession();
+ }
+
+ /**
+ * Adds interestOps to SelectionKey of the TransportLayer
+ * @param ops SelectionKey interestOps
+ */
+ @Override
+ public void addInterestOps(int ops) {
+ if (!key.isValid())
+ throw new CancelledKeyException();
+ else if (!handshakeComplete)
+ throw new IllegalStateException("handshake is not completed");
+
+ key.interestOps(key.interestOps() | ops);
+ }
+
+ /**
+ * removes interestOps to SelectionKey of the TransportLayer
+ * @param ops SelectionKey interestOps
+ */
+ @Override
+ public void removeInterestOps(int ops) {
+ if (!key.isValid())
+ throw new CancelledKeyException();
+ else if (!handshakeComplete)
+ throw new IllegalStateException("handshake is not completed");
+
+ key.interestOps(key.interestOps() & ~ops);
+ }
+
+
+ /**
+ * returns delegatedTask for the SSLEngine.
+ */
+ protected Runnable delegatedTask() {
+ return sslEngine.getDelegatedTask();
+ }
+
+ /**
+ * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
+ * @param dst ByteBuffer
+ */
+ private int readFromAppBuffer(ByteBuffer dst) {
+ appReadBuffer.flip();
+ int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
+ if (remaining > 0) {
+ int limit = appReadBuffer.limit();
+ appReadBuffer.limit(appReadBuffer.position() + remaining);
+ dst.put(appReadBuffer);
+ appReadBuffer.limit(limit);
+ }
+ appReadBuffer.compact();
+ return remaining;
+ }
+
+ private int packetBufferSize() {
+ return sslEngine.getSession().getPacketBufferSize();
+ }
+
+ private int applicationBufferSize() {
+ return sslEngine.getSession().getApplicationBufferSize();
+ }
+
+ private void handshakeFailure() {
+ //Release all resources such as internal buffers that SSLEngine is managing
+ sslEngine.closeOutbound();
+ try {
+ sslEngine.closeInbound();
+ } catch (SSLException e) {
+ log.debug("SSLEngine.closeInBound() raised an exception.", e);
+ }
+ }
+
+ @Override
+ public boolean isMute() {
+ return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 618a0fa..39eae4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -3,15 +3,16 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
@@ -103,4 +104,9 @@ public interface Selectable {
*/
public void unmuteAll();
-}
\ No newline at end of file
+ /**
+ * returns true if a channel is ready
+ * @param id The id for the connection
+ */
+ public boolean isChannelReady(String id);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ce20111..12c911c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -43,26 +43,26 @@ import org.slf4j.LoggerFactory;
* responses.
* <p>
* A connection can be added to the nioSelector associated with an integer id by doing
- *
+ *
* <pre>
* nioSelector.connect("42", new InetSocketAddress("google.com", server.port), 64000, 64000);
* </pre>
- *
+ *
* The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
* the connection. The successful invocation of this method does not mean a valid connection has been established.
- *
+ *
* Sending requests, receiving responses, processing connection completions, and disconnections on the existing
* connections are all done using the <code>poll()</code> call.
- *
+ *
* <pre>
* nioSelector.send(new NetworkSend(myDestination, myBytes));
* nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
* nioSelector.poll(TIMEOUT_MS);
* </pre>
- *
+ *
* The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
* various getters. These are reset by each call to <code>poll()</code>.
- *
+ *
* This class is not thread safe!
*/
public class Selector implements Selectable {
@@ -70,9 +70,10 @@ public class Selector implements Selectable {
private static final Logger log = LoggerFactory.getLogger(Selector.class);
private final java.nio.channels.Selector nioSelector;
- private final Map<String, SelectionKey> keys;
+ private final Map<String, KafkaChannel> channels;
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
+ private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final List<String> disconnected;
private final List<String> connected;
private final List<String> failedSends;
@@ -80,6 +81,7 @@ public class Selector implements Selectable {
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
+ private final ChannelBuilder channelBuilder;
private final Map<String, Long> lruConnections;
private final long connectionsMaxIdleNanos;
private final int maxReceiveSize;
@@ -91,7 +93,7 @@ public class Selector implements Selectable {
/**
* Create a new nioSelector
*/
- public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
+ public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
@@ -102,13 +104,15 @@ public class Selector implements Selectable {
this.time = time;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
- this.keys = new HashMap<String, SelectionKey>();
+ this.channels = new HashMap<String, KafkaChannel>();
this.completedSends = new ArrayList<Send>();
this.completedReceives = new ArrayList<NetworkReceive>();
+ this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
this.connected = new ArrayList<String>();
this.disconnected = new ArrayList<String>();
this.failedSends = new ArrayList<String>();
this.sensors = new SelectorMetrics(metrics);
+ this.channelBuilder = channelBuilder;
// initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
currentTimeNanos = new SystemTime().nanoseconds();
@@ -116,8 +120,8 @@ public class Selector implements Selectable {
this.metricsPerConnection = metricsPerConnection;
}
- public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
+ public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) {
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
}
/**
@@ -135,28 +139,29 @@ public class Selector implements Selectable {
*/
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
- if (this.keys.containsKey(id))
+ if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
- SocketChannel channel = SocketChannel.open();
- channel.configureBlocking(false);
- Socket socket = channel.socket();
+ SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
socket.setSendBufferSize(sendBufferSize);
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
try {
- channel.connect(address);
+ socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
- channel.close();
+ socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
- channel.close();
+ socketChannel.close();
throw e;
}
- SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
- key.attach(new Transmissions(id));
- this.keys.put(id, key);
+ SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
+ KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ key.attach(channel);
+ this.channels.put(id, channel);
}
/**
@@ -164,10 +169,11 @@ public class Selector implements Selectable {
* Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
* Note that we are not checking if the connection id is valid - since the connection already exists
*/
- public void register(String id, SocketChannel channel) throws ClosedChannelException {
- SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
- key.attach(new Transmissions(id));
- this.keys.put(id, key);
+ public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
+ SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
+ KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ key.attach(channel);
+ this.channels.put(id, channel);
}
/**
@@ -176,9 +182,9 @@ public class Selector implements Selectable {
*/
@Override
public void disconnect(String id) {
- SelectionKey key = this.keys.get(id);
- if (key != null)
- key.cancel();
+ KafkaChannel channel = channelForId(id);
+ if (channel != null)
+ channel.disconnect();
}
/**
@@ -194,14 +200,15 @@ public class Selector implements Selectable {
*/
@Override
public void close() {
- List<String> connections = new LinkedList<String>(keys.keySet());
+ List<String> connections = new LinkedList<String>(channels.keySet());
for (String id: connections)
close(id);
-
try {
this.nioSelector.close();
} catch (IOException e) {
log.error("Exception closing nioSelector:", e);
+ } catch (SecurityException se) {
+ log.error("Exception closing nioSelector:", se);
}
}
@@ -210,28 +217,38 @@ public class Selector implements Selectable {
* @param send The request to send
*/
public void send(Send send) {
- SelectionKey key = keyForId(send.destination());
- Transmissions transmissions = transmissions(key);
- if (transmissions.hasSend())
- throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
- transmissions.send = send;
+ KafkaChannel channel = channelForId(send.destination());
+ if (channel == null)
+ throw new IllegalStateException("channel is not connected");
+
try {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ channel.setSend(send);
} catch (CancelledKeyException e) {
- close(transmissions.id);
this.failedSends.add(send.destination());
+ close(channel);
}
}
/**
* Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
* disconnections, initiating new sends, or making progress on in-progress sends or receives.
- *
+ *
* When this call is completed the user can check for completed sends, receives, connections or disconnects using
* {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
* lists will be cleared at the beginning of each {@link #poll(long)} call and repopulated by the call if there is
* any completed I/O.
- *
+ *
+ * In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
+ * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
+ * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrpyted
+ * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
+ * application buffer size. This means we might be reading additional bytes than the requested size.
+ * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
+ * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
+ * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
+ * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
+ * and pop response and add to the completedReceives.
+ *
* @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
* @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
@@ -239,7 +256,8 @@ public class Selector implements Selectable {
@Override
public void poll(long timeout) throws IOException {
clear();
-
+ if (hasStagedReceives())
+ timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
@@ -253,85 +271,73 @@ public class Selector implements Selectable {
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
-
- Transmissions transmissions = transmissions(key);
- SocketChannel channel = channel(key);
+ KafkaChannel channel = channel(key);
// register all per-connection metrics at once
- sensors.maybeRegisterConnectionMetrics(transmissions.id);
- lruConnections.put(transmissions.id, currentTimeNanos);
+ sensors.maybeRegisterConnectionMetrics(channel.id());
+ lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
channel.finishConnect();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- this.connected.add(transmissions.id);
+ this.connected.add(channel.id());
this.sensors.connectionCreated.record();
- log.debug("Connection {} created", transmissions.id);
}
- /* read from any connections that have readable data */
- if (key.isReadable()) {
- if (!transmissions.hasReceive())
- transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+ /* if channel is not ready finish prepare */
+ if (channel.isConnected() && !channel.ready())
+ channel.prepare();
+
+ /* if channel is ready read from any connections that have readable data */
+ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
+ NetworkReceive networkReceive;
try {
- transmissions.receive.readFrom(channel);
+ while ((networkReceive = channel.read()) != null) {
+ addToStagedReceives(channel, networkReceive);
+ }
} catch (InvalidReceiveException e) {
- log.error("Invalid data received from " + transmissions.id + " closing connection", e);
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
+ log.error("Invalid data received from " + channel.id() + " closing connection", e);
+ close(channel);
+ this.disconnected.add(channel.id());
throw e;
}
- if (transmissions.receive.complete()) {
- transmissions.receive.payload().rewind();
- this.completedReceives.add(transmissions.receive);
- this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
- transmissions.clearReceive();
- }
}
- /* write to any sockets that have space in their buffer and for which we have data */
- if (key.isWritable()) {
- transmissions.send.writeTo(channel);
- if (transmissions.send.completed()) {
- this.completedSends.add(transmissions.send);
- this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
- transmissions.clearSend();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
+ if (channel.ready() && key.isWritable()) {
+ Send send = channel.write();
+ if (send != null) {
+ this.completedSends.add(send);
+ this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
+ close(channel);
+ this.disconnected.add(channel.id());
}
} catch (IOException e) {
- String desc = socketDescription(channel);
+ String desc = channel.socketDescription();
if (e instanceof EOFException || e instanceof ConnectException)
log.debug("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);
- close(transmissions.id);
- this.disconnected.add(transmissions.id);
+ close(channel);
+ this.disconnected.add(channel.id());
}
}
}
+
+ addToCompletedReceives();
+
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
- private String socketDescription(SocketChannel channel) {
- Socket socket = channel.socket();
- if (socket == null)
- return "[unconnected socket]";
- else if (socket.getInetAddress() != null)
- return socket.getInetAddress().toString();
- else
- return socket.getLocalAddress().toString();
- }
+
@Override
public List<Send> completedSends() {
@@ -355,32 +361,34 @@ public class Selector implements Selectable {
@Override
public void mute(String id) {
- mute(this.keyForId(id));
+ KafkaChannel channel = channelForId(id);
+ mute(channel);
}
- private void mute(SelectionKey key) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+ private void mute(KafkaChannel channel) {
+ channel.mute();
}
@Override
public void unmute(String id) {
- unmute(this.keyForId(id));
+ KafkaChannel channel = channelForId(id);
+ unmute(channel);
}
- private void unmute(SelectionKey key) {
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ private void unmute(KafkaChannel channel) {
+ channel.unmute();
}
@Override
public void muteAll() {
- for (SelectionKey key : this.keys.values())
- mute(key);
+ for (KafkaChannel channel : this.channels.values())
+ mute(channel);
}
@Override
public void unmuteAll() {
- for (SelectionKey key : this.keys.values())
- unmute(key);
+ for (KafkaChannel channel : this.channels.values())
+ unmute(channel);
}
private void maybeCloseOldestConnection() {
@@ -418,7 +426,7 @@ public class Selector implements Selectable {
/**
* Check for data, waiting up to the given timeout.
- *
+ *
* @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
* @return The number of keys ready
* @throws IOException
@@ -434,81 +442,107 @@ public class Selector implements Selectable {
/**
* Begin closing this connection
+ * @param id channel id
*/
public void close(String id) {
- SelectionKey key = keyForId(id);
- lruConnections.remove(id);
- SocketChannel channel = channel(key);
- Transmissions trans = transmissions(key);
- if (trans != null) {
- this.keys.remove(trans.id);
- trans.clearReceive();
- trans.clearSend();
- }
- key.attach(null);
- key.cancel();
+ KafkaChannel channel = this.channels.get(id);
+ if (channel != null)
+ close(channel);
+ }
+
+ /**
+ * Begin closing this connection
+ */
+ private void close(KafkaChannel channel) {
try {
- channel.socket().close();
channel.close();
} catch (IOException e) {
- log.error("Exception closing connection to node {}:", trans.id, e);
+ log.error("Exception closing connection to node {}:", channel.id(), e);
}
+ this.stagedReceives.remove(channel);
+ this.channels.remove(channel.id());
+ this.lruConnections.remove(channel.id());
this.sensors.connectionClosed.record();
}
/**
- * Get the selection key associated with this numeric id
+ * check if channel is ready
*/
- private SelectionKey keyForId(String id) {
- SelectionKey key = this.keys.get(id);
- if (key == null)
- throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
- return key;
+ @Override
+ public boolean isChannelReady(String id) {
+ KafkaChannel channel = channelForId(id);
+ return channel.ready();
}
/**
- * Get the transmissions for the given connection
+ * Get the channel associated with this numeric id
*/
- private Transmissions transmissions(SelectionKey key) {
- return (Transmissions) key.attachment();
+ private KafkaChannel channelForId(String id) {
+ KafkaChannel channel = this.channels.get(id);
+ if (channel == null)
+ throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
+ return channel;
}
/**
- * Get the socket channel associated with this selection key
+ * Get the channel associated with selectionKey
*/
- private SocketChannel channel(SelectionKey key) {
- return (SocketChannel) key.channel();
+ private KafkaChannel channel(SelectionKey key) {
+ return (KafkaChannel) key.attachment();
}
/**
- * The id and in-progress send and receive associated with a connection
+ * Check if given channel has a staged receive
*/
- private static class Transmissions {
- public String id;
- public Send send;
- public NetworkReceive receive;
+ private boolean hasStagedReceive(KafkaChannel channel) {
+ return stagedReceives.containsKey(channel);
+ }
- public Transmissions(String id) {
- this.id = id;
+ /**
+ * check if stagedReceives have unmuted channel
+ */
+ private boolean hasStagedReceives() {
+ for (KafkaChannel channel : this.stagedReceives.keySet()) {
+ if (!channel.isMute())
+ return true;
}
+ return false;
+ }
- public boolean hasSend() {
- return this.send != null;
- }
- public void clearSend() {
- this.send = null;
- }
+ /**
+ * adds a receive to staged receieves
+ */
+ private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
+ if (!stagedReceives.containsKey(channel))
+ stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
- public boolean hasReceive() {
- return this.receive != null;
- }
+ Deque<NetworkReceive> deque = stagedReceives.get(channel);
+ deque.add(receive);
+ }
- public void clearReceive() {
- this.receive = null;
+ /**
+ * checks if there are any staged receives and adds to completedReceives
+ */
+ private void addToCompletedReceives() {
+ if (this.stagedReceives.size() > 0) {
+ Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
+ KafkaChannel channel = entry.getKey();
+ if (!channel.isMute()) {
+ Deque<NetworkReceive> deque = entry.getValue();
+ NetworkReceive networkReceive = deque.poll();
+ this.completedReceives.add(networkReceive);
+ this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+ if (deque.size() == 0)
+ iter.remove();
+ }
+ }
}
}
+
private class SelectorMetrics {
private final Metrics metrics;
public final Sensor connectionClosed;
@@ -575,7 +609,7 @@ public class Selector implements Selectable {
metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
this.metrics.addMetric(metricName, new Measurable() {
public double measure(MetricConfig config, long now) {
- return keys.size();
+ return channels.size();
}
});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index 8f6daad..e0d8831 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -33,7 +33,7 @@ public interface Send {
/**
* Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
* to be completely written
- * @param channel The channel to write to
+ * @param channel The Channel to write to
* @return The number of bytes written
* @throws IOException If the write fails
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
new file mode 100644
index 0000000..e9158aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for underlying communication.
+ * At very basic level it is wrapper around SocketChannel and can be used as substitue for SocketChannel
+ * and other network Channel implementations.
+ * As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as
+ * a network I/O channel.
+ */
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.GatheringByteChannel;
+
+import java.security.Principal;
+
+
+public interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel {
+
+ /**
+ * Returns true if the channel has handshake and authentication done.
+ */
+ boolean ready();
+
+ /**
+ * Finishes the process of connecting a socket channel.
+ */
+ void finishConnect() throws IOException;
+
+ /**
+ * disconnect socketChannel
+ */
+ void disconnect();
+
+ /**
+ * Tells whether or not this channel's network socket is connected.
+ */
+ boolean isConnected();
+
+ /**
+ * returns underlying socketChannel
+ */
+ SocketChannel socketChannel();
+
+
+ /**
+ * Performs SSL handshake hence is a no-op for the non-secure
+ * implementation
+ * @throws IOException
+ */
+ void handshake() throws IOException;
+
+ /**
+ * Returns true if there are any pending writes
+ */
+ boolean hasPendingWrites();
+
+ /**
+ * returns SSLSession.getPeerPrinicpal if SSLTransportLayer used
+ * for non-secure returns a "ANONYMOUS" as the peerPrincipal
+ */
+ Principal peerPrincipal() throws IOException;
+
+ void addInterestOps(int ops);
+
+ void removeInterestOps(int ops);
+
+ boolean isMute();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index dab1a94..a624741 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -24,6 +24,8 @@ import java.util.Map;
public enum SecurityProtocol {
/** Un-authenticated, non-encrypted channel */
PLAINTEXT(0, "PLAINTEXT"),
+ /** SSL channel */
+ SSL(1, "SSL"),
/** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
TRACE(Short.MAX_VALUE, "TRACE");
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
new file mode 100644
index 0000000..fbbeb9e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+import java.security.Principal;
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+
+/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/
+
+public class DefaultPrincipalBuilder implements PrincipalBuilder {
+
+ public void configure(Map<String, ?> configs) {}
+
+ public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException {
+ try {
+ return transportLayer.peerPrincipal();
+ } catch (Exception e) {
+ throw new KafkaException("Failed to build principal due to: ", e);
+ }
+ }
+
+ public void close() throws KafkaException {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
new file mode 100644
index 0000000..277b6ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.security.Principal;
+
+public class KafkaPrincipal implements Principal {
+ private final String name;
+
+ public KafkaPrincipal(String name) {
+ if (name == null)
+ throw new IllegalArgumentException("name is null");
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object)
+ return true;
+
+ if (object instanceof KafkaPrincipal) {
+ return name.equals(((KafkaPrincipal) object).getName());
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
new file mode 100644
index 0000000..b7cc378
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+/*
+ * PrincipalBuilder for Authenticator
+ */
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+import java.security.Principal;
+
+public interface PrincipalBuilder extends Configurable {
+
+ /**
+ * configure this class with give key-value pair
+ */
+ public void configure(Map<String, ?> configs);
+
+ /**
+ * Returns Principal
+ * @param TransportLayer
+ * @param Authenticator
+ */
+ Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException;
+
+ /**
+ * Close this PrincipalBuilder
+ */
+ public void close() throws KafkaException;
+
+}