You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/19 06:53:37 UTC

[3/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
new file mode 100644
index 0000000..a3567af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for PLAINTEXT communication
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PlaintextTransportLayer implements TransportLayer {
+    private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class);
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private final Principal principal = new KafkaPrincipal("ANONYMOUS");
+
+    public PlaintextTransportLayer(SelectionKey key) throws IOException {
+        this.key = key;
+        this.socketChannel = (SocketChannel) key.channel();
+    }
+
+    @Override
+    public boolean ready() {
+        return true;
+    }
+
+    @Override
+    public void finishConnect() throws IOException {
+        socketChannel.finishConnect();
+        int ops = key.interestOps();
+        ops &= ~SelectionKey.OP_CONNECT;
+        ops |= SelectionKey.OP_READ;
+        key.interestOps(ops);
+    }
+
+    @Override
+    public void disconnect() {
+        key.cancel();
+    }
+
+    @Override
+    public SocketChannel socketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socketChannel.isOpen();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socketChannel.isConnected();
+    }
+
+    /**
+     * Closes this channel
+     *
+     * @throws IOException If I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+        socketChannel.socket().close();
+        socketChannel.close();
+        key.attach(null);
+        key.cancel();
+    }
+
+    /**
+     * Performs SSL handshake hence is a no-op for the non-secure
+     * implementation
+     * @throws IOException
+    */
+    @Override
+    public void handshake() throws IOException {}
+
+    /**
+    * Reads a sequence of bytes from this channel into the given buffer.
+    *
+    * @param dst The buffer into which bytes are to be transferred
+    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+    * @throws IOException if some other I/O error occurs
+    */
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        return socketChannel.read(dst);
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffers.
+     *
+     * @param dsts - The buffers into which bytes are to be transferred.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts) throws IOException {
+        return socketChannel.read(dsts);
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+     * @param dsts - The buffers into which bytes are to be transferred
+     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        return socketChannel.read(dsts, offset, length);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param src The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return socketChannel.write(src);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param srcs The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return socketChannel.write(srcs);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+    * @return returns no.of bytes written , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        return socketChannel.write(srcs, offset, length);
+    }
+
+    /**
+     * always returns false as there will be not be any
+     * pending writes since we directly write to socketChannel.
+     */
+    @Override
+    public boolean hasPendingWrites() {
+        return false;
+    }
+
+    /**
+     * Returns ANONYMOUS as Principal.
+     */
+    @Override
+    public Principal peerPrincipal() throws IOException {
+        return principal;
+    }
+
+    /**
+     * Adds the interestOps to selectionKey.
+     * @param interestOps
+     */
+    @Override
+    public void addInterestOps(int ops) {
+        key.interestOps(key.interestOps() | ops);
+
+    }
+
+    /**
+     * Removes the interestOps from selectionKey.
+     * @param interestOps
+     */
+    @Override
+    public void removeInterestOps(int ops) {
+        key.interestOps(key.interestOps() & ~ops);
+    }
+
+    @Override
+    public boolean isMute() {
+        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
new file mode 100644
index 0000000..88c218b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
+    private SSLFactory sslFactory;
+    private PrincipalBuilder principalBuilder;
+    private SSLFactory.Mode mode;
+
+    public SSLChannelBuilder(SSLFactory.Mode mode) {
+        this.mode = mode;
+    }
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.sslFactory = new SSLFactory(mode);
+            this.sslFactory.configure(configs);
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(configs);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        KafkaChannel channel = null;
+        try {
+            SocketChannel socketChannel = (SocketChannel) key.channel();
+            SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
+                                                                     sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+                                                                                                socketChannel.socket().getPort()));
+            Authenticator authenticator = new DefaultAuthenticator();
+            authenticator.configure(transportLayer, this.principalBuilder);
+            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.info("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+        return channel;
+    }
+
+    public void close()  {
+        this.principalBuilder.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
new file mode 100644
index 0000000..8ba1b01
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.CancelledKeyException;
+
+import java.security.Principal;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Transport layer for SSL communication
+ */
+
+public class SSLTransportLayer implements TransportLayer {
+    private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
+    private final String channelId;
+    protected final SSLEngine sslEngine;
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private HandshakeStatus handshakeStatus;
+    private SSLEngineResult handshakeResult;
+    private boolean handshakeComplete = false;
+    private boolean closing = false;
+    private ByteBuffer netReadBuffer;
+    private ByteBuffer netWriteBuffer;
+    private ByteBuffer appReadBuffer;
+    private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+    public SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+        this.channelId = channelId;
+        this.key = key;
+        this.socketChannel = (SocketChannel) key.channel();
+        this.sslEngine = sslEngine;
+        this.netReadBuffer = ByteBuffer.allocate(packetBufferSize());
+        this.netWriteBuffer = ByteBuffer.allocate(packetBufferSize());
+        this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
+        startHandshake();
+    }
+
+    /**
+     * starts sslEngine handshake process
+     */
+    private void startHandshake() throws IOException {
+        //clear & set netRead & netWrite buffers
+        netWriteBuffer.position(0);
+        netWriteBuffer.limit(0);
+        netReadBuffer.position(0);
+        netReadBuffer.limit(0);
+        handshakeComplete = false;
+        closing = false;
+        //initiate handshake
+        sslEngine.beginHandshake();
+        handshakeStatus = sslEngine.getHandshakeStatus();
+    }
+
+    @Override
+    public boolean ready() {
+        return handshakeComplete;
+    }
+
+    /**
+     * does socketChannel.finishConnect()
+     */
+    @Override
+    public void finishConnect() throws IOException {
+        socketChannel.finishConnect();
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+    }
+
+    /**
+     * disconnects selectionKey.
+     */
+    @Override
+    public void disconnect() {
+        key.cancel();
+    }
+
+    @Override
+    public SocketChannel socketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socketChannel.isOpen();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socketChannel.isConnected();
+    }
+
+
+    /**
+    * Sends a SSL close message and closes socketChannel.
+    * @throws IOException if an I/O error occurs
+    * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+    */
+    @Override
+    public void close() throws IOException {
+        if (closing) return;
+        closing = true;
+        sslEngine.closeOutbound();
+        try {
+            if (!flush(netWriteBuffer)) {
+                throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
+            }
+            //prep the buffer for the close message
+            netWriteBuffer.clear();
+            //perform the close, since we called sslEngine.closeOutbound
+            SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
+            //we should be in a close state
+            if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+                throw new IOException("Invalid close state, will not send network data.");
+            }
+            netWriteBuffer.flip();
+            flush(netWriteBuffer);
+            socketChannel.socket().close();
+            socketChannel.close();
+        } catch (IOException ie) {
+            log.warn("Failed to send SSL Close message ", ie);
+        }
+        key.attach(null);
+        key.cancel();
+    }
+
+    /**
+     * returns true if there are any pending contents in netWriteBuffer
+     */
+    @Override
+    public boolean hasPendingWrites() {
+        return netWriteBuffer.hasRemaining();
+    }
+
+    /**
+    * Flushes the buffer to the network, non blocking
+    * @param buf ByteBuffer
+    * @return boolean true if the buffer has been emptied out, false otherwise
+    * @throws IOException
+    */
+    private boolean flush(ByteBuffer buf) throws IOException {
+        int remaining = buf.remaining();
+        if (remaining > 0) {
+            int written = socketChannel.write(buf);
+            return written >= remaining;
+        }
+        return true;
+    }
+
+    /**
+    * Performs SSL handshake, non blocking.
+    * Before application data (kafka protocols) can be sent client & kafka broker must
+    * perform ssl handshake.
+    * During the handshake SSLEngine generates encrypted data  that will be transported over socketChannel.
+    * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
+    * determine what operation needs to occur to move handshake along.
+    * A typical handshake might look like this.
+    * +-------------+----------------------------------+-------------+
+    * |  client     |  SSL/TLS message                 | HSStatus    |
+    * +-------------+----------------------------------+-------------+
+    * | wrap()      | ClientHello                      | NEED_UNWRAP |
+    * | unwrap()    | ServerHello/Cert/ServerHelloDone | NEED_WRAP   |
+    * | wrap()      | ClientKeyExchange                | NEED_WRAP   |
+    * | wrap()      | ChangeCipherSpec                 | NEED_WRAP   |
+    * | wrap()      | Finished                         | NEED_UNWRAP |
+    * | unwrap()    | ChangeCipherSpec                 | NEED_UNWRAP |
+    * | unwrap()    | Finished                         | FINISHED    |
+    * +-------------+----------------------------------+-------------+
+    *
+    * @throws IOException
+    */
+    @Override
+    public void handshake() throws IOException {
+        boolean read = key.isReadable();
+        boolean write = key.isWritable();
+        handshakeComplete = false;
+        handshakeStatus = sslEngine.getHandshakeStatus();
+        if (!flush(netWriteBuffer)) {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            return;
+        }
+        try {
+            switch (handshakeStatus) {
+                case NEED_TASK:
+                    log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeStatus = runDelegatedTasks();
+                    break;
+                case NEED_WRAP:
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeResult = handshakeWrap(write);
+                    if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                        int currentPacketBufferSize = packetBufferSize();
+                        netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentPacketBufferSize);
+                        if (netWriteBuffer.position() >= currentPacketBufferSize) {
+                            throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.position() +
+                                                            ") >= network buffer size (" + currentPacketBufferSize + ")");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException();
+                    }
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
+                    //we will break here otherwise we can do need_unwrap in the same call.
+                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP ||  !flush(netWriteBuffer)) {
+                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        break;
+                    }
+                case NEED_UNWRAP:
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeResult = handshakeUnwrap(read);
+                    if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        int currentPacketBufferSize = packetBufferSize();
+                        netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+                        if (netReadBuffer.position() >= currentPacketBufferSize) {
+                            throw new IllegalStateException("Buffer underflow when there is available data");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                        int currentAppBufferSize = applicationBufferSize();
+                        appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
+                        if (appReadBuffer.position() > currentAppBufferSize) {
+                            throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
+                                                            ") > packet buffer size (" + currentAppBufferSize + ")");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
+                    }
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+
+                    //if handshakeStatus completed than fall-through to finished status.
+                    //after handshake is finished there is no data left to read/write in socketChannel.
+                    //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
+                    if (handshakeStatus != HandshakeStatus.FINISHED) {
+                        if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
+                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
+                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                        }
+                        break;
+                    }
+                case FINISHED:
+                    handshakeFinished();
+                    break;
+                case NOT_HANDSHAKING:
+                    handshakeFinished();
+                    break;
+                default:
+                    throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
+            }
+        } catch (SSLException e) {
+            handshakeFailure();
+            throw e;
+        }
+    }
+
+
+    /**
+     * Executes the SSLEngine tasks needed.
+     * @return HandshakeStatus
+     */
+    private HandshakeStatus runDelegatedTasks() {
+        for (;;) {
+            Runnable task = delegatedTask();
+            if (task == null) {
+                break;
+            }
+            task.run();
+        }
+        return sslEngine.getHandshakeStatus();
+    }
+
+    /**
+     * Checks if the handshake status is finished
+     * Sets the interestOps for the selectionKey.
+     */
+    private void handshakeFinished() throws IOException {
+        // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
+        // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
+        // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
+        if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+            //we are complete if we have delivered the last package
+            handshakeComplete = !netWriteBuffer.hasRemaining();
+            //remove OP_WRITE if we are complete, otherwise we still have data to write
+            if (!handshakeComplete)
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            else
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+            log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
+                      channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+        } else {
+            throw new IOException("NOT_HANDSHAKING during handshake");
+        }
+    }
+
+    /**
+    * Performs the WRAP function
+    * @param doWrite boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult  handshakeWrap(Boolean doWrite) throws IOException {
+        log.trace("SSLHandshake handshakeWrap", channelId);
+        if (netWriteBuffer.hasRemaining())
+            throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
+        //this should never be called with a network buffer that contains data
+        //so we can clear it here.
+        netWriteBuffer.clear();
+        SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
+        //prepare the results to be written
+        netWriteBuffer.flip();
+        handshakeStatus = result.getHandshakeStatus();
+        if (result.getStatus() == SSLEngineResult.Status.OK &&
+            result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+            handshakeStatus = runDelegatedTasks();
+        }
+
+        if (doWrite) flush(netWriteBuffer);
+        return result;
+    }
+
+    /**
+    * Perform handshake unwrap
+    * @param doRead boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException {
+        log.trace("SSLHandshake handshakeUnwrap", channelId);
+        SSLEngineResult result;
+        boolean cont = false;
+        int read = 0;
+        if (doRead)  {
+            read = socketChannel.read(netReadBuffer);
+            if (read == -1) throw new EOFException("EOF during handshake.");
+        }
+        do {
+            //prepare the buffer with the incoming data
+            netReadBuffer.flip();
+            result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+            netReadBuffer.compact();
+            handshakeStatus = result.getHandshakeStatus();
+            if (result.getStatus() == SSLEngineResult.Status.OK &&
+                result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                handshakeStatus = runDelegatedTasks();
+            }
+            cont = result.getStatus() == SSLEngineResult.Status.OK &&
+                handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+            log.trace("SSLHandshake handshakeUnwrap: handshakeStatus ", handshakeStatus);
+        } while (netReadBuffer.position() != 0 && cont);
+
+        return result;
+    }
+
+
+    /**
+    * Reads a sequence of bytes from this channel into the given buffer.
+    *
+    * @param dst The buffer into which bytes are to be transferred
+    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+    * @throws IOException if some other I/O error occurs
+    */
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        if (closing) return -1;
+        int read = 0;
+        if (!handshakeComplete) return read;
+
+        //if we have unread decrypted data in appReadBuffer read that into dst buffer.
+        if (appReadBuffer.position() > 0) {
+            read = readFromAppBuffer(dst);
+        }
+
+        if (dst.remaining() > 0) {
+            netReadBuffer = Utils.ensureCapacity(netReadBuffer, packetBufferSize());
+            if (netReadBuffer.remaining() > 0) {
+                int netread = socketChannel.read(netReadBuffer);
+                if (netread == 0) return netread;
+                else if (netread < 0) throw new EOFException("EOF during read");
+            }
+            do {
+                netReadBuffer.flip();
+                SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+                netReadBuffer.compact();
+                // handle ssl renegotiation.
+                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+                    log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshake();
+                    break;
+                }
+
+                if (unwrapResult.getStatus() == Status.OK) {
+                    read += readFromAppBuffer(dst);
+                } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                    int currentApplicationBufferSize = applicationBufferSize();
+                    appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
+                    if (appReadBuffer.position() >= currentApplicationBufferSize) {
+                        throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
+                                                        ") >= application buffer size (" + currentApplicationBufferSize + ")");
+                    }
+
+                    // appReadBuffer will extended upto currentApplicationBufferSize
+                    // we need to read the existing content into dst before we can do unwrap again.  If there are no space in dst
+                    // we can break here.
+                    if (dst.hasRemaining())
+                        read += readFromAppBuffer(dst);
+                    else
+                        break;
+                } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                    int currentPacketBufferSize = packetBufferSize();
+                    netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+                    if (netReadBuffer.position() >= currentPacketBufferSize) {
+                        throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
+                                                        ") > packet buffer size (" + currentPacketBufferSize + ")");
+                    }
+                    break;
+                } else if (unwrapResult.getStatus() == Status.CLOSED) {
+                    throw new EOFException();
+                }
+            } while (netReadBuffer.position() != 0);
+        }
+        return read;
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffers.
+     *
+     * @param dsts - The buffers into which bytes are to be transferred.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts) throws IOException {
+        return read(dsts, 0, dsts.length);
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+     * @param dsts - The buffers into which bytes are to be transferred
+     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+            throw new IndexOutOfBoundsException();
+
+        int totalRead = 0;
+        int i = offset;
+        while (i < length) {
+            if (dsts[i].hasRemaining()) {
+                int read = read(dsts[i]);
+                if (read > 0)
+                    totalRead += read;
+                else
+                    break;
+            }
+            if (!dsts[i].hasRemaining()) {
+                i++;
+            }
+        }
+        return totalRead;
+    }
+
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param src The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        int written = 0;
+        if (closing) throw new IllegalStateException("Channel is in closing state");
+        if (!handshakeComplete) return written;
+
+        if (!flush(netWriteBuffer))
+            return written;
+
+        netWriteBuffer.clear();
+        SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
+        netWriteBuffer.flip();
+
+        //handle ssl renegotiation
+        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+            handshake();
+            return written;
+        }
+
+        if (wrapResult.getStatus() == Status.OK) {
+            written = wrapResult.bytesConsumed();
+            flush(netWriteBuffer);
+        } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+            int currentPacketBufferSize = packetBufferSize();
+            netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, packetBufferSize());
+            if (netWriteBuffer.position() >= currentPacketBufferSize)
+                throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.position() + ") >= network buffer size (" + currentPacketBufferSize + ")");
+        } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+            throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
+        } else if (wrapResult.getStatus() == Status.CLOSED) {
+            throw new EOFException();
+        }
+        return written;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+    * @return returns no.of bytes written , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length)  throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+            throw new IndexOutOfBoundsException();
+        int totalWritten = 0;
+        int i = offset;
+        while (i < length) {
+            if (srcs[i].hasRemaining() || hasPendingWrites()) {
+                int written = write(srcs[i]);
+                if (written > 0) {
+                    totalWritten += written;
+                }
+            }
+            if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
+                i++;
+            } else {
+                // if we are unable to write the current buffer to socketChannel we should break,
+                // as we might have reached max socket send buffer size.
+                break;
+            }
+        }
+        return totalWritten;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return write(srcs, 0, srcs.length);
+    }
+
+
+    /**
+     * SSLSession's peerPrincipal for the remote host.
+     * @return Principal
+     */
+    public Principal peerPrincipal() throws IOException {
+        try {
+            return sslEngine.getSession().getPeerPrincipal();
+        } catch (SSLPeerUnverifiedException se) {
+            throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se));
+        }
+    }
+
+    /**
+     * returns a SSL Session after the handshake is established
+     * throws IllegalStateException if the handshake is not established
+     */
+    public SSLSession sslSession() throws IllegalStateException {
+        return sslEngine.getSession();
+    }
+
+    /**
+     * Adds interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void addInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() | ops);
+    }
+
+    /**
+     * removes interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void removeInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() & ~ops);
+    }
+
+
+    /**
+     * returns delegatedTask for the SSLEngine.
+     */
+    protected Runnable delegatedTask() {
+        return sslEngine.getDelegatedTask();
+    }
+
+    /**
+     * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
+     * @param dst ByteBuffer
+     */
+    private int readFromAppBuffer(ByteBuffer dst) {
+        appReadBuffer.flip();
+        int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
+        if (remaining > 0) {
+            int limit = appReadBuffer.limit();
+            appReadBuffer.limit(appReadBuffer.position() + remaining);
+            dst.put(appReadBuffer);
+            appReadBuffer.limit(limit);
+        }
+        appReadBuffer.compact();
+        return remaining;
+    }
+
+    private int packetBufferSize() {
+        return sslEngine.getSession().getPacketBufferSize();
+    }
+
+    private int applicationBufferSize() {
+        return sslEngine.getSession().getApplicationBufferSize();
+    }
+
+    private void handshakeFailure() {
+        //Release all resources such as internal buffers that SSLEngine is managing
+        sslEngine.closeOutbound();
+        try {
+            sslEngine.closeInbound();
+        } catch (SSLException e) {
+            log.debug("SSLEngine.closeInBound() raised an exception.",  e);
+        }
+    }
+
+    @Override
+    public boolean isMute() {
+        return  key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 618a0fa..39eae4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -3,15 +3,16 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -103,4 +104,9 @@ public interface Selectable {
      */
     public void unmuteAll();
 
-}
\ No newline at end of file
+    /**
+     * returns true  if a channel is ready
+     * @param id The id for the connection
+     */
+    public boolean isChannelReady(String id);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ce20111..12c911c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -43,26 +43,26 @@ import org.slf4j.LoggerFactory;
  * responses.
  * <p>
  * A connection can be added to the nioSelector associated with an integer id by doing
- * 
+ *
  * <pre>
  * nioSelector.connect(&quot;42&quot;, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
  * </pre>
- * 
+ *
  * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
  * the connection. The successful invocation of this method does not mean a valid connection has been established.
- * 
+ *
  * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
  * connections are all done using the <code>poll()</code> call.
- * 
+ *
  * <pre>
  * nioSelector.send(new NetworkSend(myDestination, myBytes));
  * nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
  * nioSelector.poll(TIMEOUT_MS);
  * </pre>
- * 
+ *
  * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
  * various getters. These are reset by each call to <code>poll()</code>.
- * 
+ *
  * This class is not thread safe!
  */
 public class Selector implements Selectable {
@@ -70,9 +70,10 @@ public class Selector implements Selectable {
     private static final Logger log = LoggerFactory.getLogger(Selector.class);
 
     private final java.nio.channels.Selector nioSelector;
-    private final Map<String, SelectionKey> keys;
+    private final Map<String, KafkaChannel> channels;
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
+    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final List<String> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -80,6 +81,7 @@ public class Selector implements Selectable {
     private final SelectorMetrics sensors;
     private final String metricGrpPrefix;
     private final Map<String, String> metricTags;
+    private final ChannelBuilder channelBuilder;
     private final Map<String, Long> lruConnections;
     private final long connectionsMaxIdleNanos;
     private final int maxReceiveSize;
@@ -91,7 +93,7 @@ public class Selector implements Selectable {
     /**
      * Create a new nioSelector
      */
-    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
+    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -102,13 +104,15 @@ public class Selector implements Selectable {
         this.time = time;
         this.metricGrpPrefix = metricGrpPrefix;
         this.metricTags = metricTags;
-        this.keys = new HashMap<String, SelectionKey>();
+        this.channels = new HashMap<String, KafkaChannel>();
         this.completedSends = new ArrayList<Send>();
         this.completedReceives = new ArrayList<NetworkReceive>();
+        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
         this.connected = new ArrayList<String>();
         this.disconnected = new ArrayList<String>();
         this.failedSends = new ArrayList<String>();
         this.sensors = new SelectorMetrics(metrics);
+        this.channelBuilder = channelBuilder;
         // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
         this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
         currentTimeNanos = new SystemTime().nanoseconds();
@@ -116,8 +120,8 @@ public class Selector implements Selectable {
         this.metricsPerConnection = metricsPerConnection;
     }
 
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
     }
 
     /**
@@ -135,28 +139,29 @@ public class Selector implements Selectable {
      */
     @Override
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
-        if (this.keys.containsKey(id))
+        if (this.channels.containsKey(id))
             throw new IllegalStateException("There is already a connection for id " + id);
 
-        SocketChannel channel = SocketChannel.open();
-        channel.configureBlocking(false);
-        Socket socket = channel.socket();
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(false);
+        Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
         socket.setSendBufferSize(sendBufferSize);
         socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
         try {
-            channel.connect(address);
+            socketChannel.connect(address);
         } catch (UnresolvedAddressException e) {
-            channel.close();
+            socketChannel.close();
             throw new IOException("Can't resolve address: " + address, e);
         } catch (IOException e) {
-            channel.close();
+            socketChannel.close();
             throw e;
         }
-        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
+        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        key.attach(channel);
+        this.channels.put(id, channel);
     }
 
     /**
@@ -164,10 +169,11 @@ public class Selector implements Selectable {
      * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
      * Note that we are not checking if the connection id is valid - since the connection already exists
      */
-    public void register(String id, SocketChannel channel) throws ClosedChannelException {
-        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
+    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
+        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        key.attach(channel);
+        this.channels.put(id, channel);
     }
 
     /**
@@ -176,9 +182,9 @@ public class Selector implements Selectable {
      */
     @Override
     public void disconnect(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key != null)
-            key.cancel();
+        KafkaChannel channel = channelForId(id);
+        if (channel != null)
+            channel.disconnect();
     }
 
     /**
@@ -194,14 +200,15 @@ public class Selector implements Selectable {
      */
     @Override
     public void close() {
-        List<String> connections = new LinkedList<String>(keys.keySet());
+        List<String> connections = new LinkedList<String>(channels.keySet());
         for (String id: connections)
             close(id);
-
         try {
             this.nioSelector.close();
         } catch (IOException e) {
             log.error("Exception closing nioSelector:", e);
+        } catch (SecurityException se) {
+            log.error("Exception closing nioSelector:", se);
         }
     }
 
@@ -210,28 +217,38 @@ public class Selector implements Selectable {
      * @param send The request to send
      */
     public void send(Send send) {
-        SelectionKey key = keyForId(send.destination());
-        Transmissions transmissions = transmissions(key);
-        if (transmissions.hasSend())
-            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
-        transmissions.send = send;
+        KafkaChannel channel = channelForId(send.destination());
+        if (channel == null)
+            throw new IllegalStateException("channel is not connected");
+
         try {
-            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            channel.setSend(send);
         } catch (CancelledKeyException e) {
-            close(transmissions.id);
             this.failedSends.add(send.destination());
+            close(channel);
         }
     }
 
     /**
      * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
      * disconnections, initiating new sends, or making progress on in-progress sends or receives.
-     * 
+     *
      * When this call is completed the user can check for completed sends, receives, connections or disconnects using
      * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
      * lists will be cleared at the beginning of each {@link #poll(long)} call and repopulated by the call if there is
      * any completed I/O.
-     * 
+     *
+     * In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
+     * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
+     * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrpyted
+     * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
+     * application buffer size. This means we might be reading additional bytes than the requested size.
+     * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
+     * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
+     * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
+     * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
+     * and pop response and add to the completedReceives.
+     *
      * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
      * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
      *         already an in-progress send
@@ -239,7 +256,8 @@ public class Selector implements Selectable {
     @Override
     public void poll(long timeout) throws IOException {
         clear();
-
+        if (hasStagedReceives())
+            timeout = 0;
         /* check ready keys */
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
@@ -253,85 +271,73 @@ public class Selector implements Selectable {
             while (iter.hasNext()) {
                 SelectionKey key = iter.next();
                 iter.remove();
-
-                Transmissions transmissions = transmissions(key);
-                SocketChannel channel = channel(key);
+                KafkaChannel channel = channel(key);
 
                 // register all per-connection metrics at once
-                sensors.maybeRegisterConnectionMetrics(transmissions.id);
-                lruConnections.put(transmissions.id, currentTimeNanos);
+                sensors.maybeRegisterConnectionMetrics(channel.id());
+                lruConnections.put(channel.id(), currentTimeNanos);
 
                 try {
                     /* complete any connections that have finished their handshake */
                     if (key.isConnectable()) {
                         channel.finishConnect();
-                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-                        this.connected.add(transmissions.id);
+                        this.connected.add(channel.id());
                         this.sensors.connectionCreated.record();
-                        log.debug("Connection {} created", transmissions.id);
                     }
 
-                    /* read from any connections that have readable data */
-                    if (key.isReadable()) {
-                        if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+                    /* if channel is not ready finish prepare */
+                    if (channel.isConnected() && !channel.ready())
+                        channel.prepare();
+
+                    /* if channel is ready read from any connections that have readable data */
+                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
+                        NetworkReceive networkReceive;
                         try {
-                            transmissions.receive.readFrom(channel);
+                            while ((networkReceive = channel.read()) != null) {
+                                addToStagedReceives(channel, networkReceive);
+                            }
                         } catch (InvalidReceiveException e) {
-                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
-                            close(transmissions.id);
-                            this.disconnected.add(transmissions.id);
+                            log.error("Invalid data received from " + channel.id() + " closing connection", e);
+                            close(channel);
+                            this.disconnected.add(channel.id());
                             throw e;
                         }
-                        if (transmissions.receive.complete()) {
-                            transmissions.receive.payload().rewind();
-                            this.completedReceives.add(transmissions.receive);
-                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
-                            transmissions.clearReceive();
-                        }
                     }
 
-                    /* write to any sockets that have space in their buffer and for which we have data */
-                    if (key.isWritable()) {
-                        transmissions.send.writeTo(channel);
-                        if (transmissions.send.completed()) {
-                            this.completedSends.add(transmissions.send);
-                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
-                            transmissions.clearSend();
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                    /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
+                    if (channel.ready() && key.isWritable()) {
+                        Send send = channel.write();
+                        if (send != null) {
+                            this.completedSends.add(send);
+                            this.sensors.recordBytesSent(channel.id(), send.size());
                         }
                     }
 
                     /* cancel any defunct sockets */
                     if (!key.isValid()) {
-                        close(transmissions.id);
-                        this.disconnected.add(transmissions.id);
+                        close(channel);
+                        this.disconnected.add(channel.id());
                     }
                 } catch (IOException e) {
-                    String desc = socketDescription(channel);
+                    String desc = channel.socketDescription();
                     if (e instanceof EOFException || e instanceof ConnectException)
                         log.debug("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, e);
-                    close(transmissions.id);
-                    this.disconnected.add(transmissions.id);
+                    close(channel);
+                    this.disconnected.add(channel.id());
                 }
             }
         }
+
+        addToCompletedReceives();
+
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
         maybeCloseOldestConnection();
     }
 
-    private String socketDescription(SocketChannel channel) {
-        Socket socket = channel.socket();
-        if (socket == null)
-            return "[unconnected socket]";
-        else if (socket.getInetAddress() != null)
-            return socket.getInetAddress().toString();
-        else
-            return socket.getLocalAddress().toString();
-    }
+
 
     @Override
     public List<Send> completedSends() {
@@ -355,32 +361,34 @@ public class Selector implements Selectable {
 
     @Override
     public void mute(String id) {
-        mute(this.keyForId(id));
+        KafkaChannel channel = channelForId(id);
+        mute(channel);
     }
 
-    private void mute(SelectionKey key) {
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    private void mute(KafkaChannel channel) {
+        channel.mute();
     }
 
     @Override
     public void unmute(String id) {
-        unmute(this.keyForId(id));
+        KafkaChannel channel = channelForId(id);
+        unmute(channel);
     }
 
-    private void unmute(SelectionKey key) {
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+    private void unmute(KafkaChannel channel) {
+        channel.unmute();
     }
 
     @Override
     public void muteAll() {
-        for (SelectionKey key : this.keys.values())
-            mute(key);
+        for (KafkaChannel channel : this.channels.values())
+            mute(channel);
     }
 
     @Override
     public void unmuteAll() {
-        for (SelectionKey key : this.keys.values())
-            unmute(key);
+        for (KafkaChannel channel : this.channels.values())
+            unmute(channel);
     }
 
     private void maybeCloseOldestConnection() {
@@ -418,7 +426,7 @@ public class Selector implements Selectable {
 
     /**
      * Check for data, waiting up to the given timeout.
-     * 
+     *
      * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
      * @return The number of keys ready
      * @throws IOException
@@ -434,81 +442,107 @@ public class Selector implements Selectable {
 
     /**
      * Begin closing this connection
+     * @param id channel id
      */
     public void close(String id) {
-        SelectionKey key = keyForId(id);
-        lruConnections.remove(id);
-        SocketChannel channel = channel(key);
-        Transmissions trans = transmissions(key);
-        if (trans != null) {
-            this.keys.remove(trans.id);
-            trans.clearReceive();
-            trans.clearSend();
-        }
-        key.attach(null);
-        key.cancel();
+        KafkaChannel channel = this.channels.get(id);
+        if (channel != null)
+            close(channel);
+    }
+
+    /**
+     * Begin closing this connection
+     */
+    private void close(KafkaChannel channel) {
         try {
-            channel.socket().close();
             channel.close();
         } catch (IOException e) {
-            log.error("Exception closing connection to node {}:", trans.id, e);
+            log.error("Exception closing connection to node {}:", channel.id(), e);
         }
+        this.stagedReceives.remove(channel);
+        this.channels.remove(channel.id());
+        this.lruConnections.remove(channel.id());
         this.sensors.connectionClosed.record();
     }
 
     /**
-     * Get the selection key associated with this numeric id
+     * check if channel is ready
      */
-    private SelectionKey keyForId(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
-        return key;
+    @Override
+    public boolean isChannelReady(String id) {
+        KafkaChannel channel = channelForId(id);
+        return channel.ready();
     }
 
     /**
-     * Get the transmissions for the given connection
+     * Get the channel associated with this numeric id
      */
-    private Transmissions transmissions(SelectionKey key) {
-        return (Transmissions) key.attachment();
+    private KafkaChannel channelForId(String id) {
+        KafkaChannel channel = this.channels.get(id);
+        if (channel == null)
+            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
+        return channel;
     }
 
     /**
-     * Get the socket channel associated with this selection key
+     * Get the channel associated with selectionKey
      */
-    private SocketChannel channel(SelectionKey key) {
-        return (SocketChannel) key.channel();
+    private KafkaChannel channel(SelectionKey key) {
+        return (KafkaChannel) key.attachment();
     }
 
     /**
-     * The id and in-progress send and receive associated with a connection
+     * Check if given channel has a staged receive
      */
-    private static class Transmissions {
-        public String id;
-        public Send send;
-        public NetworkReceive receive;
+    private boolean hasStagedReceive(KafkaChannel channel) {
+        return stagedReceives.containsKey(channel);
+    }
 
-        public Transmissions(String id) {
-            this.id = id;
+    /**
+     * check if stagedReceives have unmuted channel
+     */
+    private boolean hasStagedReceives() {
+        for (KafkaChannel channel : this.stagedReceives.keySet()) {
+            if (!channel.isMute())
+                return true;
         }
+        return false;
+    }
 
-        public boolean hasSend() {
-            return this.send != null;
-        }
 
-        public void clearSend() {
-            this.send = null;
-        }
+    /**
+     * adds a receive to staged receieves
+     */
+    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
+        if (!stagedReceives.containsKey(channel))
+            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
 
-        public boolean hasReceive() {
-            return this.receive != null;
-        }
+        Deque<NetworkReceive> deque = stagedReceives.get(channel);
+        deque.add(receive);
+    }
 
-        public void clearReceive() {
-            this.receive = null;
+    /**
+     * checks if there are any staged receives and adds to completedReceives
+     */
+    private void addToCompletedReceives() {
+        if (this.stagedReceives.size() > 0) {
+            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
+                KafkaChannel channel = entry.getKey();
+                if (!channel.isMute()) {
+                    Deque<NetworkReceive> deque = entry.getValue();
+                    NetworkReceive networkReceive = deque.poll();
+                    this.completedReceives.add(networkReceive);
+                    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+                    if (deque.size() == 0)
+                        iter.remove();
+                }
+            }
         }
     }
 
+
     private class SelectorMetrics {
         private final Metrics metrics;
         public final Sensor connectionClosed;
@@ -575,7 +609,7 @@ public class Selector implements Selectable {
             metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
-                    return keys.size();
+                    return channels.size();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index 8f6daad..e0d8831 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -33,7 +33,7 @@ public interface Send {
     /**
      * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
      * to be completely written
-     * @param channel The channel to write to
+     * @param channel The Channel to write to
      * @return The number of bytes written
      * @throws IOException If the write fails
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
new file mode 100644
index 0000000..e9158aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for underlying communication.
+ * At very basic level it is wrapper around SocketChannel and can be used as substitue for SocketChannel
+ * and other network Channel implementations.
+ * As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as
+ * a network I/O channel.
+ */
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.GatheringByteChannel;
+
+import java.security.Principal;
+
+
+public interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel {
+
+    /**
+     * Returns true if the channel has handshake and authentication done.
+     */
+    boolean ready();
+
+    /**
+     * Finishes the process of connecting a socket channel.
+     */
+    void finishConnect() throws IOException;
+
+    /**
+     * disconnect socketChannel
+     */
+    void disconnect();
+
+    /**
+     * Tells whether or not this channel's network socket is connected.
+     */
+    boolean isConnected();
+
+    /**
+     * returns underlying socketChannel
+     */
+    SocketChannel socketChannel();
+
+
+    /**
+     * Performs SSL handshake hence is a no-op for the non-secure
+     * implementation
+     * @throws IOException
+    */
+    void handshake() throws IOException;
+
+    /**
+     * Returns true if there are any pending writes
+     */
+    boolean hasPendingWrites();
+
+    /**
+     * returns SSLSession.getPeerPrinicpal if SSLTransportLayer used
+     * for non-secure returns a "ANONYMOUS" as the peerPrincipal
+     */
+    Principal peerPrincipal() throws IOException;
+
+    void addInterestOps(int ops);
+
+    void removeInterestOps(int ops);
+
+    boolean isMute();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index dab1a94..a624741 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -24,6 +24,8 @@ import java.util.Map;
 public enum SecurityProtocol {
     /** Un-authenticated, non-encrypted channel */
     PLAINTEXT(0, "PLAINTEXT"),
+    /** SSL channel */
+    SSL(1, "SSL"),
     /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
     TRACE(Short.MAX_VALUE, "TRACE");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
new file mode 100644
index 0000000..fbbeb9e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+import java.security.Principal;
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+
+/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/
+
+public class DefaultPrincipalBuilder implements PrincipalBuilder {
+
+    public void configure(Map<String, ?> configs) {}
+
+    public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException {
+        try {
+            return transportLayer.peerPrincipal();
+        } catch (Exception e) {
+            throw new KafkaException("Failed to build principal due to: ", e);
+        }
+    }
+
+    public void close() throws KafkaException {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
new file mode 100644
index 0000000..277b6ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.security.Principal;
+
+public class KafkaPrincipal implements Principal {
+    private final String name;
+
+    public KafkaPrincipal(String name) {
+        if (name == null)
+            throw new IllegalArgumentException("name is null");
+        this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object)
+            return true;
+
+        if (object instanceof KafkaPrincipal) {
+            return name.equals(((KafkaPrincipal) object).getName());
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return name.hashCode();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
new file mode 100644
index 0000000..b7cc378
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+/*
+ * PrincipalBuilder for Authenticator
+ */
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+import java.security.Principal;
+
+public interface PrincipalBuilder extends Configurable {
+
+    /**
+     * configure this class with give key-value pair
+     */
+    public void configure(Map<String, ?> configs);
+
+    /**
+     * Returns Principal
+     * @param TransportLayer
+     * @param Authenticator
+     */
+    Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException;
+
+    /**
+     * Close this PrincipalBuilder
+     */
+    public void close() throws KafkaException;
+
+}