You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by dr...@apache.org on 2015/01/22 01:56:32 UTC

[08/50] [abbrv] directory-kerberos git commit: Many changes with newname

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java
new file mode 100644
index 0000000..079a6cd
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java
@@ -0,0 +1,49 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class TransBuffer {
+
+    private BlockingQueue<ByteBuffer> bufferQueue;
+
+    public TransBuffer() {
+        bufferQueue = new ArrayBlockingQueue<ByteBuffer>(2);
+    }
+
+    public void write(ByteBuffer buffer) {
+        bufferQueue.add(buffer);
+    }
+
+    public void write(byte[] buffer) {
+        write(ByteBuffer.wrap(buffer));
+    }
+
+    public ByteBuffer read() {
+        return bufferQueue.poll();
+    }
+
+    public boolean isEmpty() {
+        return bufferQueue.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java
new file mode 100644
index 0000000..80340c8
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java
@@ -0,0 +1,39 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+
+import java.net.InetSocketAddress;
+
+public class AddressEvent extends Event {
+
+    private InetSocketAddress address;
+
+    public AddressEvent(InetSocketAddress address, EventType eventType) {
+        super(eventType);
+        this.address = address;
+    }
+
+    public InetSocketAddress getAddress() {
+        return address;
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java
new file mode 100644
index 0000000..b9d48eb
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java
@@ -0,0 +1,41 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.transport.Transport;
+
+import java.nio.ByteBuffer;
+
+public class MessageEvent extends TransportEvent {
+
+    private MessageEvent(Transport transport, ByteBuffer message) {
+        super(transport, TransportEventType.INBOUND_MESSAGE, message);
+    }
+
+    public ByteBuffer getMessage() {
+        return (ByteBuffer) getEventData();
+    }
+
+    public static MessageEvent createInboundMessageEvent(
+            Transport transport, ByteBuffer message) {
+        return new MessageEvent(transport, message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java
new file mode 100644
index 0000000..3c2ae34
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java
@@ -0,0 +1,56 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+
+public class TransportEvent extends Event {
+
+    private Transport transport;
+
+    public TransportEvent(Transport transport, EventType eventType) {
+        super(eventType);
+        this.transport = transport;
+    }
+
+    public TransportEvent(Transport transport, EventType eventType, Object eventData) {
+        super(eventType, eventData);
+        this.transport = transport;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public static TransportEvent createWritableTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.TRANSPORT_WRITABLE);
+    }
+
+    public static TransportEvent createReadableTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.TRANSPORT_READABLE);
+    }
+
+    public static TransportEvent createNewTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.NEW_TRANSPORT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java
new file mode 100644
index 0000000..6036c02
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.EventType;
+
+public enum TransportEventType implements EventType {
+    NEW_TRANSPORT,
+    TRANSPORT_WRITABLE,
+    TRANSPORT_READABLE,
+    INBOUND_MESSAGE
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java
new file mode 100644
index 0000000..8b8c352
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java
@@ -0,0 +1,38 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+public interface DecodingCallback {
+
+    /**
+     * OK, enough data is ready, a message can be out
+     */
+    public void onMessageComplete(int messageLength);
+
+    /**
+     * Need more data to be available
+     */
+    public void onMoreDataNeeded();
+
+    /**
+     * Need more data to be available, with determined more data length given
+     */
+    public void onMoreDataNeeded(int needDataLength);
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java
new file mode 100644
index 0000000..ed9eb72
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java
@@ -0,0 +1,26 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import java.nio.ByteBuffer;
+
+public interface StreamingDecoder {
+    public void decode(ByteBuffer streamingBuffer, DecodingCallback callback);
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java
new file mode 100644
index 0000000..0d5ae05
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java
@@ -0,0 +1,112 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+public class TcpAcceptor extends Acceptor {
+
+    public TcpAcceptor(StreamingDecoder streamingDecoder) {
+        this(new TcpTransportHandler(streamingDecoder));
+    }
+
+    public TcpAcceptor(TcpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() == TcpEventType.ADDRESS_BIND) {
+                    try {
+                        doBind((AddressEvent) event);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        TcpEventType.ADDRESS_BIND
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doListen(InetSocketAddress socketAddress) {
+        AddressEvent event = TcpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    @Override
+    protected void dealKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isAcceptable()) {
+            doAccept(selectionKey);
+        } else {
+            super.dealKey(selectionKey);
+        }
+    }
+
+    void doAccept(SelectionKey key) throws IOException {
+        ServerSocketChannel server = (ServerSocketChannel) key.channel();
+        SocketChannel channel;
+
+        try {
+            while ((channel = server.accept()) != null) {
+                channel.configureBlocking(false);
+                channel.socket().setTcpNoDelay(true);
+                channel.socket().setKeepAlive(true);
+
+                Transport transport = new TcpTransport(channel,
+                    ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+
+                channel.register(selector,
+                    SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+                onNewTransport(transport);
+            }
+        } catch (ClosedByInterruptException e) {
+            // No op as normal
+        }
+    }
+
+    protected void doBind(AddressEvent event) throws IOException {
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(event.getAddress());
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java
new file mode 100644
index 0000000..500d224
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class TcpAddressEvent {
+
+    public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+        return new AddressEvent(address, TcpEventType.ADDRESS_BIND);
+    }
+
+    public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+        return new AddressEvent(address, TcpEventType.ADDRESS_CONNECT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java
new file mode 100644
index 0000000..e460961
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java
@@ -0,0 +1,94 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class TcpConnector extends Connector {
+
+    public TcpConnector(StreamingDecoder streamingDecoder) {
+        this(new TcpTransportHandler(streamingDecoder));
+    }
+
+    public TcpConnector(TcpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() ==  TcpEventType.ADDRESS_CONNECT) {
+                    doConnect((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        TcpEventType.ADDRESS_CONNECT
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doConnect(InetSocketAddress sa) {
+        AddressEvent event = TcpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doConnect(AddressEvent event) throws IOException {
+        SocketChannel channel = SocketChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(event.getAddress());
+        channel.register(selector,
+                SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    protected void dealKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isConnectable()) {
+            doConnect(selectionKey);
+        } else {
+            super.dealKey(selectionKey);
+        }
+    }
+
+    private void doConnect(SelectionKey key) throws IOException {
+        SocketChannel channel = (SocketChannel) key.channel();
+        if (channel.isConnectionPending()) {
+            channel.finishConnect();
+        }
+
+        Transport transport = new TcpTransport(channel,
+                ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+        onNewTransport(transport);
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java
new file mode 100644
index 0000000..e754fa5
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java
@@ -0,0 +1,27 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.EventType;
+
+public enum TcpEventType implements EventType {
+    ADDRESS_BIND,
+    ADDRESS_CONNECT
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java
new file mode 100644
index 0000000..a662e38
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java
@@ -0,0 +1,110 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.buffer.BufferPool;
+import org.apache.kerby.transport.buffer.RecvBuffer;
+import org.apache.kerby.transport.event.MessageEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public class TcpTransport extends Transport {
+
+    private SocketChannel channel;
+
+    private StreamingDecoder streamingDecoder;
+
+    private RecvBuffer recvBuffer;
+
+    public TcpTransport(SocketChannel channel,
+                        StreamingDecoder streamingDecoder) throws IOException {
+        super((InetSocketAddress) channel.getRemoteAddress());
+        this.channel = channel;
+        this.streamingDecoder = streamingDecoder;
+
+        this.recvBuffer = new RecvBuffer();
+    }
+
+    @Override
+    protected void sendOutMessage(ByteBuffer message) throws IOException {
+        channel.write(message);
+    }
+
+    public void onReadable() throws IOException {
+        ByteBuffer writeBuffer = BufferPool.allocate(65536);
+        if (channel.read(writeBuffer) <= 0) {
+            BufferPool.release(writeBuffer);
+            return;
+        }
+
+        writeBuffer.flip();
+        recvBuffer.write(writeBuffer);
+
+        WithReadDataHander rdHandler = new WithReadDataHander();
+        rdHandler.handle();
+    }
+
+    class WithReadDataHander implements DecodingCallback {
+        private ByteBuffer streamingBuffer;
+
+        @Override
+        public void onMessageComplete(int messageLength) {
+            ByteBuffer message = null;
+
+            int remaining = streamingBuffer.remaining();
+            if (remaining == messageLength) {
+                message = streamingBuffer;
+            } else if (remaining > messageLength) {
+                message = streamingBuffer.duplicate();
+                int newLimit = streamingBuffer.position() + messageLength;
+                message.limit(newLimit);
+
+                streamingBuffer.position(newLimit);
+                recvBuffer.writeFirst(streamingBuffer);
+            }
+
+            if (message != null) {
+                dispatcher.dispatch(MessageEvent.createInboundMessageEvent(TcpTransport.this, message));
+            }
+        }
+
+        @Override
+        public void onMoreDataNeeded() {
+            recvBuffer.writeFirst(streamingBuffer);
+        }
+
+        @Override
+        public void onMoreDataNeeded(int needDataLength) {
+            recvBuffer.writeFirst(streamingBuffer);
+        }
+
+        public void handle() {
+            if (recvBuffer.isEmpty()) return;
+
+            streamingBuffer = recvBuffer.readMostBytes();
+
+            streamingDecoder.decode(streamingBuffer.duplicate(), this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java
new file mode 100644
index 0000000..ad010ed
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java
@@ -0,0 +1,77 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.TransportHandler;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public class TcpTransportHandler extends TransportHandler {
+
+    private StreamingDecoder streamingDecoder;
+
+    public TcpTransportHandler(StreamingDecoder streamingDecoder) {
+        this.streamingDecoder = streamingDecoder;
+    }
+
+    public StreamingDecoder getStreamingDecoder() {
+        return streamingDecoder;
+    }
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new TransportEventType[] {
+                TransportEventType.TRANSPORT_READABLE,
+                TransportEventType.TRANSPORT_WRITABLE
+        };
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        EventType eventType = event.getEventType();
+        TransportEvent te = (TransportEvent) event;
+        Transport transport = te.getTransport();
+        if (eventType == TransportEventType.TRANSPORT_READABLE) {
+            transport.onReadable();
+        } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+            transport.onWriteable();
+        }
+    }
+
+    @Override
+    public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isReadable()) {
+            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+            TcpTransport transport = (TcpTransport) selectionKey.attachment();
+            dispatch(TransportEvent.createReadableTransportEvent(transport));
+        } else if (selectionKey.isWritable()) {
+            selectionKey.interestOps(SelectionKey.OP_READ);
+            TcpTransport transport = (TcpTransport) selectionKey.attachment();
+            dispatch(TransportEvent.createWritableTransportEvent(transport));
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java
new file mode 100644
index 0000000..974f871
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java
@@ -0,0 +1,84 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpAcceptor extends Acceptor {
+
+    private DatagramChannel serverChannel;
+
+    public UdpAcceptor() {
+        this(new UdpTransportHandler());
+    }
+
+    public UdpAcceptor(UdpTransportHandler udpTransportHandler) {
+        super(udpTransportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() ==  UdpEventType.ADDRESS_BIND) {
+                    doBind((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        UdpEventType.ADDRESS_BIND
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doListen(InetSocketAddress socketAddress) {
+        AddressEvent event = UdpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    private void doBind(AddressEvent event) throws IOException {
+        serverChannel = DatagramChannel.open();
+        serverChannel.configureBlocking(false);
+        serverChannel.bind(event.getAddress());
+        serverChannel.register(selector, SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+
+        try {
+            serverChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java
new file mode 100644
index 0000000..b29100e
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class UdpAddressEvent {
+
+    public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+        return new AddressEvent(address, UdpEventType.ADDRESS_BIND);
+    }
+
+    public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+        return new AddressEvent(address, UdpEventType.ADDRESS_CONNECT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java
new file mode 100644
index 0000000..e5dd7b2
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java
@@ -0,0 +1,47 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+
+import java.nio.channels.DatagramChannel;
+
+public class UdpChannelEvent extends Event {
+
+    private DatagramChannel channel;
+
+    private UdpChannelEvent(DatagramChannel channel, EventType eventType) {
+        super(eventType);
+        this.channel = channel;
+    }
+
+    public DatagramChannel getChannel() {
+        return channel;
+    }
+
+    public static UdpChannelEvent makeWritableChannelEvent(DatagramChannel channel) {
+        return new UdpChannelEvent(channel, UdpEventType.CHANNEL_WRITABLE);
+    }
+
+    public static UdpChannelEvent makeReadableChannelEvent(DatagramChannel channel) {
+        return new UdpChannelEvent(channel, UdpEventType.CHANNEL_READABLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java
new file mode 100644
index 0000000..9234a8c
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java
@@ -0,0 +1,76 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpConnector extends Connector {
+
+    public UdpConnector() {
+        this(new UdpTransportHandler());
+    }
+
+    public UdpConnector(UdpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() == UdpEventType.ADDRESS_CONNECT) {
+                    doConnect((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        UdpEventType.ADDRESS_CONNECT
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doConnect(InetSocketAddress sa) {
+        AddressEvent event = UdpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doConnect(AddressEvent event) throws IOException {
+        InetSocketAddress address = event.getAddress();
+        DatagramChannel channel = DatagramChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(address);
+
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+
+        UdpTransport transport = new UdpTransport(channel, address);
+        onNewTransport(transport);
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java
new file mode 100644
index 0000000..d291f75
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.EventType;
+
+public enum UdpEventType implements EventType {
+    ADDRESS_BIND,
+    ADDRESS_CONNECT,
+    CHANNEL_WRITABLE,
+    CHANNEL_READABLE
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java
new file mode 100644
index 0000000..fe11a64
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java
@@ -0,0 +1,65 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.buffer.TransBuffer;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+public class UdpTransport extends Transport {
+    private DatagramChannel channel;
+
+    protected TransBuffer recvBuffer;
+
+    public UdpTransport(DatagramChannel channel,
+                        InetSocketAddress remoteAddress) {
+        super(remoteAddress);
+        this.channel = channel;
+        this.recvBuffer = new TransBuffer();
+    }
+
+    protected void onRecvData(ByteBuffer data) {
+        if (data != null) {
+            recvBuffer.write(data);
+            dispatcher.dispatch(TransportEvent.createReadableTransportEvent(this));
+        }
+    }
+
+    @Override
+    public void onReadable() throws IOException {
+        super.onReadable();
+
+        if (! recvBuffer.isEmpty()) {
+            ByteBuffer message = recvBuffer.read();
+            dispatcher.dispatch(MessageEvent.createInboundMessageEvent(this, message));
+        }
+    }
+
+    @Override
+    protected void sendOutMessage(ByteBuffer message) throws IOException {
+        channel.send(message, getRemoteAddress());
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java
new file mode 100644
index 0000000..fc02bf5
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java
@@ -0,0 +1,109 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.TransportHandler;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+public class UdpTransportHandler extends TransportHandler {
+
+    protected Map<InetSocketAddress, UdpTransport> transports =
+            new HashMap<InetSocketAddress, UdpTransport>();
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new EventType[] {
+                UdpEventType.CHANNEL_READABLE,
+                TransportEventType.TRANSPORT_WRITABLE,
+                TransportEventType.TRANSPORT_READABLE,
+                TransportEventType.NEW_TRANSPORT
+        };
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        EventType eventType = event.getEventType();
+        if (eventType == UdpEventType.CHANNEL_READABLE) {
+            UdpChannelEvent ce = (UdpChannelEvent) event;
+            DatagramChannel channel = ce.getChannel();
+            doRead(channel);
+        } else if (eventType == TransportEventType.TRANSPORT_READABLE) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            transport.onReadable();
+        } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            transport.onWriteable();
+        }  else if (eventType == TransportEventType.NEW_TRANSPORT) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            if (transport instanceof UdpTransport) {
+                InetSocketAddress remoteAddress = transport.getRemoteAddress();
+                if (! transports.containsKey(remoteAddress)) {
+                    transports.put(remoteAddress, (UdpTransport) transport);
+                }
+            }
+        }
+    }
+
+    private void doRead(DatagramChannel channel) throws IOException {
+        ByteBuffer recvBuffer = ByteBuffer.allocate(65536); // to optimize
+        InetSocketAddress fromAddress = (InetSocketAddress) channel.receive(recvBuffer);
+        if (fromAddress != null) {
+            recvBuffer.flip();
+            UdpTransport transport = transports.get(fromAddress);
+            if (transport == null) {
+                // should be from acceptor
+                transport = new UdpTransport(channel, fromAddress);
+                transport.setDispatcher(getDispatcher());
+                dispatch(TransportEvent.createNewTransportEvent(transport));
+            }
+            transport.onRecvData(recvBuffer);
+        }
+    }
+
+    @Override
+    public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+        DatagramChannel channel =
+                (DatagramChannel) selectionKey.channel();
+
+        if (selectionKey.isReadable()) {
+            dispatch(UdpChannelEvent.makeReadableChannelEvent(channel));
+        } else if (selectionKey.isWritable()) {
+            dispatch(UdpChannelEvent.makeWritableChannelEvent(channel));
+        }
+        // Udp channel is always writable, so not usable
+        selectionKey.interestOps(SelectionKey.OP_READ);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java
new file mode 100644
index 0000000..704afa9
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java
@@ -0,0 +1,50 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import org.apache.kerby.transport.buffer.RecvBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class TestBuffer {
+
+    @Test
+    public void testRecvBuffer() {
+        String testString = "HELLO WORLD";
+        ByteBuffer testMessage = ByteBuffer.wrap(testString.getBytes());
+        ByteBuffer tmp;
+
+        RecvBuffer testBuffer = new RecvBuffer();
+        testBuffer.write(testMessage);
+        tmp = testBuffer.readMostBytes();
+        Assert.assertArrayEquals(testString.getBytes(), tmp.array());
+
+        int nTimes = 10;
+        testBuffer.clear();
+        for (int i = 0; i < nTimes; ++i) {
+            testBuffer.write(ByteBuffer.wrap(testString.getBytes()));
+        }
+        int expectedBytes = nTimes * testMessage.limit();
+        tmp = testBuffer.readMostBytes();
+        Assert.assertEquals(expectedBytes, tmp.limit());
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
new file mode 100644
index 0000000..7526658
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
@@ -0,0 +1,58 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestNetworkBase {
+    protected String serverHost = "127.0.0.1";
+    protected short tcpPort = 8183;
+    protected short udpPort = 8184;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
new file mode 100644
index 0000000..7074a81
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
@@ -0,0 +1,212 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import junit.framework.Assert;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+public class TestNetworkClient extends TestNetworkBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunTcpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunUdpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunTcpServer() throws IOException {
+        ServerSocketChannel serverSocketChannel;
+        Selector selector = Selector.open();
+        serverSocketChannel = ServerSocketChannel .open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(tcpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void doRunUdpServer() throws IOException {
+        DatagramChannel serverSocketChannel;
+        Selector selector = Selector.open();
+        serverSocketChannel = DatagramChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        DatagramSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(udpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_READ);
+
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+                    if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        InetSocketAddress fromAddress = (InetSocketAddress) serverSocketChannel.receive(recvBuffer);
+                        if (fromAddress != null) {
+                            recvBuffer.flip();
+                            serverSocketChannel.send(recvBuffer, fromAddress);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    if (buffer != null) {
+                        clientRecvedMessage = recvBuffer2String(buffer);
+                        System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                        Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                        dispatch(new Event(TestEventType.FINISHED, result));
+                    }
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        network.tcpConnect(serverHost, tcpPort);
+        network.udpConnect(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkClient() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        Assert.assertTrue((Boolean) event.getEventData());
+
+        event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        Assert.assertTrue((Boolean) event.getEventData());
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
new file mode 100644
index 0000000..eefc7a3
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
@@ -0,0 +1,110 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import junit.framework.Assert;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+
+public class TestNetworkServer extends TestNetworkBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventHub.start();
+        network.tcpListen(serverHost, tcpPort);
+        network.udpListen(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkServer() throws IOException, InterruptedException {
+        testTcpTransport();
+        testUdpTransport();
+    }
+
+    private void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, tcpPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        Assert.assertEquals(TEST_MESSAGE, clientRecvedMessage);
+    }
+
+    private void testUdpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        DatagramChannel socketChannel = DatagramChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, udpPort);
+        socketChannel.send(ByteBuffer.wrap(TEST_MESSAGE.getBytes()), sa);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.receive(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        Assert.assertEquals(TEST_MESSAGE, clientRecvedMessage);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
new file mode 100644
index 0000000..263b7fc
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
@@ -0,0 +1,57 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestTcpBase {
+    protected String serverHost = "127.0.0.1";
+    protected short serverPort = 8181;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
new file mode 100644
index 0000000..b0bd3ff
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
@@ -0,0 +1,160 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import junit.framework.Assert;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+public class TestTcpClient extends TestTcpBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunServer() throws IOException {
+        ServerSocketChannel serverSocketChannel;
+        Selector selector = Selector.open();
+        serverSocketChannel = ServerSocketChannel .open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(serverPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    clientRecvedMessage = recvBuffer2String(buffer);
+                    System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                    Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                    dispatch(new Event(TestEventType.FINISHED, result));
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Connector connector = new TcpConnector(createStreamingDecoder());
+        eventHub.register(connector);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        connector.connect(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        Assert.assertTrue((Boolean) event.getEventData());
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
new file mode 100644
index 0000000..43ebe20
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
@@ -0,0 +1,90 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import junit.framework.Assert;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpAcceptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public class TestTcpServer extends TestTcpBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Acceptor acceptor = new TcpAcceptor(createStreamingDecoder());
+        eventHub.register(acceptor);
+
+        eventHub.start();
+        acceptor.listen(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(15);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, serverPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+
+        Assert.assertEquals(TEST_MESSAGE, clientRecvedMessage);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/7d9261af/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
new file mode 100644
index 0000000..63d71ac
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
@@ -0,0 +1,41 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.EventType;
+
+import java.nio.ByteBuffer;
+
+public class TestUdpBase {
+    protected String serverHost = "127.0.0.1";
+    protected short serverPort = 8181;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+}