You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by dr...@apache.org on 2015/01/12 14:06:12 UTC
[04/50] [abbrv] directory-kerberos git commit: Renaming packages in
contrib projects, using "apache"
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/EventHub.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/EventHub.java b/contrib/haox-event/src/main/java/org/apache/haox/event/EventHub.java
new file mode 100644
index 0000000..08172bc
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/EventHub.java
@@ -0,0 +1,173 @@
+package org.apache.haox.event;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class EventHub implements Dispatcher {
+
+ private enum BuiltInEventType implements EventType {
+ STOP,
+ ALL
+ }
+
+ private boolean started = false;
+
+ private Map<Integer, InternalEventHandler> handlers =
+ new ConcurrentHashMap<Integer, InternalEventHandler>();
+
+ private Map<EventType, Set<Integer>> eventHandlersMap =
+ new ConcurrentHashMap<EventType, Set<Integer>>();
+
+ private InternalEventHandler builtInHandler;
+
+ class BuiltInEventHandler extends AbstractEventHandler {
+ public BuiltInEventHandler() {
+ super();
+ }
+
+ @Override
+ protected void doHandle(Event event) {
+
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return BuiltInEventType.values();
+ }
+ }
+
+ public EventHub() {
+ init();
+ }
+
+ private void init() {
+ EventHandler eh = new BuiltInEventHandler();
+ builtInHandler = new ExecutedEventHandler(eh);
+ register(builtInHandler);
+ }
+
+ @Override
+ public void dispatch(Event event) {
+ process(event);
+ }
+
+ @Override
+ public void register(EventHandler handler) {
+ handler.setDispatcher(this);
+ InternalEventHandler ieh = new ExecutedEventHandler(handler);
+ register(ieh);
+ }
+
+ @Override
+ public void register(InternalEventHandler handler) {
+ handler.setDispatcher(this);
+ handler.init();
+ handlers.put(handler.id(), handler);
+
+ if (started) {
+ handler.start();
+ }
+
+ EventType[] interestedEvents = handler.getInterestedEvents();
+ Set<Integer> tmpHandlers;
+ for (EventType eventType : interestedEvents) {
+ if (eventHandlersMap.containsKey(eventType)) {
+ tmpHandlers = eventHandlersMap.get(eventType);
+ } else {
+ tmpHandlers = new HashSet<Integer>();
+ eventHandlersMap.put(eventType, tmpHandlers);
+ }
+ tmpHandlers.add(handler.id());
+ }
+ }
+
+ public EventWaiter waitEvent(final EventType event) {
+ return waitEvent(new EventType[] { event } );
+ }
+
+ public EventWaiter waitEvent(final EventType... events) {
+ EventHandler handler = new AbstractEventHandler() {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ // no op;
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return events;
+ }
+ };
+
+ handler.setDispatcher(this);
+ final WaitEventHandler waitEventHandler = new WaitEventHandler(handler);
+ register(waitEventHandler);
+ EventWaiter waiter = new EventWaiter() {
+ @Override
+ public Event waitEvent(EventType event) {
+ return waitEventHandler.waitEvent(event);
+ }
+
+ @Override
+ public Event waitEvent() {
+ return waitEventHandler.waitEvent();
+ }
+
+ @Override
+ public Event waitEvent(EventType event, long timeout,
+ TimeUnit timeUnit) throws TimeoutException {
+ return waitEventHandler.waitEvent(event, timeout, timeUnit);
+ }
+
+ @Override
+ public Event waitEvent(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ return waitEventHandler.waitEvent(timeout, timeUnit);
+ }
+ };
+
+ return waiter;
+ }
+
+ private void process(Event event) {
+ EventType eventType = event.getEventType();
+ InternalEventHandler handler;
+ Set<Integer> handlerIds;
+
+ if (eventHandlersMap.containsKey(eventType)) {
+ handlerIds = eventHandlersMap.get(eventType);
+ for (Integer hid : handlerIds) {
+ handler = handlers.get(hid);
+ handler.handle(event);
+ }
+ }
+
+ if (eventHandlersMap.containsKey(BuiltInEventType.ALL)) {
+ handlerIds = eventHandlersMap.get(BuiltInEventType.ALL);
+ for (Integer hid : handlerIds) {
+ handler = handlers.get(hid);
+ handler.handle(event);
+ }
+ }
+ }
+
+ public void start() {
+ if (!started) {
+ for (InternalEventHandler handler : handlers.values()) {
+ handler.start();
+ }
+ started = true;
+ }
+ }
+
+ public void stop() {
+ if (started) {
+ for (InternalEventHandler handler : handlers.values()) {
+ handler.stop();
+ }
+ started = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/EventType.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/EventType.java b/contrib/haox-event/src/main/java/org/apache/haox/event/EventType.java
new file mode 100644
index 0000000..2ab4f02
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/EventType.java
@@ -0,0 +1,5 @@
+package org.apache.haox.event;
+
+public interface EventType {
+ // no op
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/EventWaiter.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/EventWaiter.java b/contrib/haox-event/src/main/java/org/apache/haox/event/EventWaiter.java
new file mode 100644
index 0000000..5a0fd53
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/EventWaiter.java
@@ -0,0 +1,16 @@
+package org.apache.haox.event;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EventWaiter {
+
+ public abstract Event waitEvent(EventType event);
+
+ public abstract Event waitEvent();
+
+ public abstract Event waitEvent(EventType event, long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ public abstract Event waitEvent(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/ExecutedEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/ExecutedEventHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/event/ExecutedEventHandler.java
new file mode 100644
index 0000000..ae7d7fd
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/ExecutedEventHandler.java
@@ -0,0 +1,53 @@
+package org.apache.haox.event;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An EventHandler wrapper processing events using an ExecutorService
+ */
+public class ExecutedEventHandler extends AbstractInternalEventHandler {
+
+ private ExecutorService executorService;
+
+ public ExecutedEventHandler(EventHandler handler) {
+ super(handler);
+ }
+
+ @Override
+ protected void doHandle(final Event event) throws Exception {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ process(event);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void start() {
+ executorService = Executors.newFixedThreadPool(2);
+ }
+
+ @Override
+ public void stop() {
+ if (executorService.isShutdown()) {
+ return;
+ }
+ executorService.shutdownNow();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return executorService.isShutdown();
+ }
+
+ @Override
+ public void init() {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/InternalEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/InternalEventHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/event/InternalEventHandler.java
new file mode 100644
index 0000000..8137427
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/InternalEventHandler.java
@@ -0,0 +1,15 @@
+package org.apache.haox.event;
+
+public interface InternalEventHandler extends EventHandler {
+
+ public int id();
+
+ public void init();
+
+ public void start();
+
+ public void stop();
+
+ public boolean isStopped();
+}
+
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/LongRunningEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/LongRunningEventHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/event/LongRunningEventHandler.java
new file mode 100644
index 0000000..a8d5726
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/LongRunningEventHandler.java
@@ -0,0 +1,58 @@
+package org.apache.haox.event;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public abstract class LongRunningEventHandler extends BufferedEventHandler {
+
+ private ExecutorService executorService;
+
+ public LongRunningEventHandler(EventHandler handler) {
+ super(handler);
+ }
+
+ public LongRunningEventHandler() {
+ super();
+ }
+
+ protected abstract void loopOnce();
+
+ @Override
+ public void start() {
+ executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+
+ processEvents();
+
+ loopOnce();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void stop() {
+ if (executorService.isShutdown()) {
+ return;
+ }
+ executorService.shutdownNow();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return executorService.isShutdown();
+ }
+
+ protected void processEvents() {
+ while (! eventQueue.isEmpty()) {
+ try {
+ process(eventQueue.take());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/event/WaitEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/event/WaitEventHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/event/WaitEventHandler.java
new file mode 100644
index 0000000..75d76a9
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/event/WaitEventHandler.java
@@ -0,0 +1,109 @@
+package org.apache.haox.event;
+
+import java.util.concurrent.*;
+
+public class WaitEventHandler extends BufferedEventHandler {
+
+ private ExecutorService executorService;
+
+ public WaitEventHandler(EventHandler handler) {
+ super(handler);
+ }
+
+ public Event waitEvent() {
+ return waitEvent(null);
+ }
+
+ public Event waitEvent(final EventType eventType) {
+ Future<Event> future = doWaitEvent(eventType);
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Event waitEvent(final EventType eventType,
+ long timeout, TimeUnit timeUnit) throws TimeoutException {
+ Future<Event> future = doWaitEvent(eventType);
+
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Event waitEvent(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ Future<Event> future = doWaitEvent(null);
+
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Future<Event> doWaitEvent(final EventType eventType) {
+ Future<Event> future = executorService.submit(new Callable<Event>() {
+ @Override
+ public Event call() throws Exception {
+ if (eventType != null) {
+ return checkEvent(eventType);
+ } else {
+ return checkEvent();
+ }
+ }
+ });
+
+ return future;
+ }
+
+ private Event checkEvent() throws Exception {
+ return eventQueue.take();
+ }
+
+ private Event checkEvent(EventType eventType) throws Exception {
+ Event event = null;
+
+ while (true) {
+ if (eventQueue.size() == 1) {
+ if (eventQueue.peek().getEventType() == eventType) {
+ return eventQueue.take();
+ }
+ } else {
+ event = eventQueue.take();
+ if (event.getEventType() == eventType) {
+ return event;
+ } else {
+ eventQueue.put(event); // put back since not wanted
+ }
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ executorService = Executors.newFixedThreadPool(2);
+ }
+
+ @Override
+ public void stop() {
+ if (executorService.isShutdown()) {
+ return;
+ }
+ executorService.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return executorService.isShutdown();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/Acceptor.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/Acceptor.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/Acceptor.java
new file mode 100644
index 0000000..0888b18
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/Acceptor.java
@@ -0,0 +1,17 @@
+package org.apache.haox.transport;
+
+import java.net.InetSocketAddress;
+
+public abstract class Acceptor extends TransportSelector {
+
+ public Acceptor(TransportHandler transportHandler) {
+ super(transportHandler);
+ }
+
+ public void listen(String address, short listenPort) {
+ InetSocketAddress socketAddress = new InetSocketAddress(address, listenPort);
+ doListen(socketAddress);
+ }
+
+ protected abstract void doListen(InetSocketAddress socketAddress);
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/BytesUtil.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/BytesUtil.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/BytesUtil.java
new file mode 100644
index 0000000..c3a3e99
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/BytesUtil.java
@@ -0,0 +1,144 @@
+package org.apache.haox.transport;
+
+public class BytesUtil {
+
+ public static short bytes2short(byte[] bytes, int offset, boolean bigEndian) {
+ short val = 0;
+
+ if (bigEndian) {
+ val += (bytes[offset + 0] & 0xff) << 8;
+ val += (bytes[offset + 1] & 0xff);
+ } else {
+ val += (bytes[offset + 1] & 0xff) << 8;
+ val += (bytes[offset + 0] & 0xff);
+ }
+
+ return val;
+ }
+
+ public static short bytes2short(byte[] bytes, boolean bigEndian) {
+ return bytes2short(bytes, 0, bigEndian);
+ }
+
+ public static byte[] short2bytes(int val, boolean bigEndian) {
+ byte[] bytes = new byte[2];
+
+ short2bytes(val, bytes, 0, bigEndian);
+
+ return bytes;
+ }
+
+ public static void short2bytes(int val, byte[] bytes, int offset, boolean bigEndian) {
+ if (bigEndian) {
+ bytes[offset + 0] = (byte) ((val >> 8) & 0xff);
+ bytes[offset + 1] = (byte) ((val) & 0xff);
+ } else {
+ bytes[offset + 1] = (byte) ((val >> 8) & 0xff);
+ bytes[offset + 0] = (byte) ((val ) & 0xff);
+ }
+ }
+
+ public static int bytes2int(byte[] bytes, boolean bigEndian) {
+ return bytes2int(bytes, 0, bigEndian);
+ }
+
+ public static int bytes2int(byte[] bytes, int offset, boolean bigEndian) {
+ int val = 0;
+
+ if (bigEndian) {
+ val += (bytes[offset + 0] & 0xff) << 24;
+ val += (bytes[offset + 1] & 0xff) << 16;
+ val += (bytes[offset + 2] & 0xff) << 8;
+ val += (bytes[offset + 3] & 0xff);
+ } else {
+ val += (bytes[offset + 3] & 0xff) << 24;
+ val += (bytes[offset + 2] & 0xff) << 16;
+ val += (bytes[offset + 1] & 0xff) << 8;
+ val += (bytes[offset + 0] & 0xff);
+ }
+
+ return val;
+ }
+
+ public static byte[] int2bytes(int val, boolean bigEndian) {
+ byte[] bytes = new byte[4];
+
+ int2bytes(val, bytes, 0, bigEndian);
+
+ return bytes;
+ }
+
+ public static void int2bytes(int val, byte[] bytes, int offset, boolean bigEndian) {
+ if (bigEndian) {
+ bytes[offset + 0] = (byte) ((val >> 24) & 0xff);
+ bytes[offset + 1] = (byte) ((val >> 16) & 0xff);
+ bytes[offset + 2] = (byte) ((val >> 8) & 0xff);
+ bytes[offset + 3] = (byte) ((val) & 0xff);
+ } else {
+ bytes[offset + 3] = (byte) ((val >> 24) & 0xff);
+ bytes[offset + 2] = (byte) ((val >> 16) & 0xff);
+ bytes[offset + 1] = (byte) ((val >> 8) & 0xff);
+ bytes[offset + 0] = (byte) ((val) & 0xff);
+ }
+ }
+
+ public static byte[] long2bytes(long val, boolean bigEndian) {
+ byte[] bytes = new byte[8];
+ long2bytes(val, bytes, 0, bigEndian);
+ return bytes;
+ }
+
+ public static void long2bytes(long val, byte[] bytes, int offset, boolean bigEndian) {
+ if (bigEndian) {
+ for (int i = 0; i < 8; i++) {
+ bytes[i + offset] = (byte) ((val >> ((7 - i) * 8)) & 0xffL);
+ }
+ } else {
+ for (int i = 0; i < 8; i++) {
+ bytes[i + offset] = (byte) ((val >> (i * 8)) & 0xffL);
+ }
+ }
+ }
+
+ public static long bytes2long(byte[] bytes, boolean bigEndian) {
+ return bytes2long(bytes, 0, bigEndian);
+ }
+
+ public static long bytes2long(byte[] bytes, int offset, boolean bigEndian) {
+ long val = 0;
+
+ if (bigEndian) {
+ for (int i = 0; i < 8; i++) {
+ val |= (((long) bytes[i + offset]) & 0xffL) << ((7 - i) * 8);
+ }
+ } else {
+ for (int i = 0; i < 8; i++) {
+ val |= (((long) bytes[i + offset]) & 0xffL) << (i * 8);
+ }
+ }
+
+ return val;
+ }
+
+ public static byte[] padding(byte[] data, int block) {
+ int len = data.length;
+ int paddingLen = len % block != 0 ? 8 - len % block : 0;
+ if (paddingLen == 0) {
+ return data;
+ }
+
+ byte[] result = new byte[len + + paddingLen];
+ System.arraycopy(data, 0, result, 0, len);
+ return result;
+ }
+
+ public static byte[] duplicate(byte[] bytes) {
+ return duplicate(bytes, 0, bytes.length);
+ }
+
+ public static byte[] duplicate(byte[] bytes, int offset, int len) {
+ byte[] dup = new byte[len];
+ System.arraycopy(bytes, offset, dup, 0, len);
+ return dup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/Connector.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/Connector.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/Connector.java
new file mode 100644
index 0000000..4ba18ae
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/Connector.java
@@ -0,0 +1,17 @@
+package org.apache.haox.transport;
+
+import java.net.InetSocketAddress;
+
+public abstract class Connector extends TransportSelector {
+
+ public Connector(TransportHandler transportHandler) {
+ super(transportHandler);
+ }
+
+ public void connect(String serverAddress, short serverPort) {
+ InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+ doConnect(sa);
+ }
+
+ protected abstract void doConnect(InetSocketAddress sa);
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/MessageHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/MessageHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/MessageHandler.java
new file mode 100644
index 0000000..1f61ca5
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/MessageHandler.java
@@ -0,0 +1,23 @@
+package org.apache.haox.transport;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.event.MessageEvent;
+import org.apache.haox.transport.event.TransportEventType;
+
+public abstract class MessageHandler extends AbstractEventHandler {
+
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ handleMessage((MessageEvent) event);
+ }
+
+ protected abstract void handleMessage(MessageEvent event) throws Exception;
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] { TransportEventType.INBOUND_MESSAGE };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/Network.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/Network.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/Network.java
new file mode 100644
index 0000000..017f661
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/Network.java
@@ -0,0 +1,278 @@
+package org.apache.haox.transport;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.event.LongRunningEventHandler;
+import org.apache.haox.transport.event.AddressEvent;
+import org.apache.haox.transport.event.TransportEvent;
+import org.apache.haox.transport.tcp.*;
+import org.apache.haox.transport.udp.UdpAddressEvent;
+import org.apache.haox.transport.udp.UdpEventType;
+import org.apache.haox.transport.udp.UdpTransport;
+import org.apache.haox.transport.udp.UdpTransportHandler;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * A combined and mixed network facility handling UDP and TCP in both connect and accept sides
+ */
+public class Network extends LongRunningEventHandler {
+
+ private Selector selector;
+ private StreamingDecoder streamingDecoder;
+ private UdpTransportHandler udpTransportHandler;
+ private TcpTransportHandler tcpTransportHandler;
+
+ class MyEventHandler extends AbstractEventHandler {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ if (event.getEventType() == UdpEventType.ADDRESS_CONNECT) {
+ doUdpConnect((AddressEvent) event);
+ } else if (event.getEventType() == UdpEventType.ADDRESS_BIND) {
+ doUdpBind((AddressEvent) event);
+ } else if (event.getEventType() == TcpEventType.ADDRESS_CONNECT) {
+ doTcpConnect((AddressEvent) event);
+ } else if (event.getEventType() == TcpEventType.ADDRESS_BIND) {
+ doTcpBind((AddressEvent) event);
+ }
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[]{
+ UdpEventType.ADDRESS_CONNECT,
+ UdpEventType.ADDRESS_BIND,
+ TcpEventType.ADDRESS_CONNECT,
+ TcpEventType.ADDRESS_BIND
+ };
+ }
+ }
+
+ public Network() {
+ setEventHandler(new MyEventHandler());
+ }
+
+ @Override
+ public void init() {
+ super.init();
+
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * TCP transport only, for decoding tcp streaming into messages
+ * @param streamingDecoder
+ */
+ public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
+ this.streamingDecoder = streamingDecoder;
+ }
+
+ /**
+ * TCP only. Connect on the given server address. Can be called multiple times
+ * for multiple servers
+ * @param serverAddress
+ * @param serverPort
+ */
+ public void tcpConnect(String serverAddress, short serverPort) {
+ InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+ checkTcpTransportHandler();
+ doTcpConnect(sa);
+ }
+
+ /**
+ * UDP only. Connect on the given server address. Can be called multiple times
+ * for multiple servers
+ * @param serverAddress
+ * @param serverPort
+ */
+ public void udpConnect(String serverAddress, short serverPort) {
+ InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+ checkUdpTransportHandler();
+ doUdpConnect(sa);
+ }
+
+ /**
+ * TCP only. Listen and accept connections on the address. Can be called multiple
+ * times for multiple server addresses.
+ * @param serverAddress
+ * @param serverPort
+ */
+ public void tcpListen(String serverAddress, short serverPort) {
+ InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+ checkTcpTransportHandler();
+ doTcpListen(sa);
+ }
+
+ /**
+ * UDP only. Listen and accept connections on the address. Can be called multiple
+ * times for multiple server addresses.
+ * @param serverAddress
+ * @param serverPort
+ */
+ public void udpListen(String serverAddress, short serverPort) {
+ InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+ checkUdpTransportHandler();
+ doUdpListen(sa);
+ }
+
+ @Override
+ protected void loopOnce() {
+ try {
+ selectOnce();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void selectOnce() throws IOException {
+ if (selector.isOpen() && selector.select(2) > 0 && selector.isOpen()) {
+ Set<SelectionKey> selectionKeys = selector.selectedKeys();
+ Iterator<SelectionKey> iterator = selectionKeys.iterator();
+ while (iterator.hasNext()) {
+ SelectionKey selectionKey = iterator.next();
+ dealKey(selectionKey);
+ iterator.remove();
+ }
+ selectionKeys.clear();
+ }
+ }
+
+ private void checkTcpTransportHandler() {
+ if (tcpTransportHandler == null) {
+ if (streamingDecoder == null) {
+ throw new IllegalArgumentException("No streaming decoder set yet");
+ }
+ tcpTransportHandler = new TcpTransportHandler(streamingDecoder);
+ getDispatcher().register(tcpTransportHandler);
+ }
+ }
+
+ private void checkUdpTransportHandler() {
+ if (udpTransportHandler == null) {
+ udpTransportHandler = new UdpTransportHandler();
+ getDispatcher().register(udpTransportHandler);
+ }
+ }
+
+ private void dealKey(SelectionKey selectionKey) throws IOException {
+ if (selectionKey.isConnectable()) {
+ doTcpConnect(selectionKey);
+ } else if (selectionKey.isAcceptable()) {
+ doTcpAccept(selectionKey);
+ } else {
+ helpHandleSelectionKey(selectionKey);
+ }
+ }
+
+ private void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+ SelectableChannel channel = selectionKey.channel();
+ if (channel instanceof DatagramChannel) {
+ udpTransportHandler.helpHandleSelectionKey(selectionKey);
+ } else {
+ tcpTransportHandler.helpHandleSelectionKey(selectionKey);
+ }
+ }
+
+ private void doUdpConnect(InetSocketAddress sa) {
+ AddressEvent event = UdpAddressEvent.createAddressConnectEvent(sa);
+ dispatch(event);
+ }
+
+ private void doUdpConnect(AddressEvent event) throws IOException {
+ InetSocketAddress address = event.getAddress();
+ DatagramChannel channel = DatagramChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(address);
+
+ channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+
+ UdpTransport transport = new UdpTransport(channel, address);
+ onNewTransport(transport);
+ }
+
+ protected void doUdpListen(InetSocketAddress socketAddress) {
+ AddressEvent event = UdpAddressEvent.createAddressBindEvent(socketAddress);
+ dispatch(event);
+ }
+
+ private void doUdpBind(AddressEvent event) throws IOException {
+ DatagramChannel serverChannel = DatagramChannel.open();
+ serverChannel.configureBlocking(false);
+ serverChannel.bind(event.getAddress());
+ serverChannel.register(selector, SelectionKey.OP_READ);
+ }
+
+ protected void doTcpConnect(InetSocketAddress sa) {
+ AddressEvent event = TcpAddressEvent.createAddressConnectEvent(sa);
+ dispatch(event);
+ }
+
+ private void doTcpConnect(AddressEvent event) throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(event.getAddress());
+ channel.register(selector,
+ SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+
+ private void doTcpConnect(SelectionKey key) throws IOException {
+ SocketChannel channel = (SocketChannel) key.channel();
+ if (channel.isConnectionPending()) {
+ channel.finishConnect();
+ }
+
+ Transport transport = new TcpTransport(channel, tcpTransportHandler.getStreamingDecoder());
+ channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+ onNewTransport(transport);
+ }
+
+ protected void doTcpListen(InetSocketAddress socketAddress) {
+ AddressEvent event = TcpAddressEvent.createAddressBindEvent(socketAddress);
+ dispatch(event);
+ }
+
+ void doTcpAccept(SelectionKey key) throws IOException {
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ SocketChannel channel;
+ while ((channel = server.accept()) != null) {
+ // Quick fix: avoid exception during exiting
+ if (! selector.isOpen()) {
+ channel.close();
+ break;
+ };
+
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(true);
+ channel.socket().setKeepAlive(true);
+
+ Transport transport = new TcpTransport(channel,
+ tcpTransportHandler.getStreamingDecoder());
+ channel.register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+ onNewTransport(transport);
+ }
+ }
+
+ protected void doTcpBind(AddressEvent event) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(event.getAddress());
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
+ }
+
+ private void onNewTransport(Transport transport) {
+ transport.setDispatcher(getDispatcher());
+ dispatch(TransportEvent.createNewTransportEvent(transport));
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/Transport.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/Transport.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/Transport.java
new file mode 100644
index 0000000..e32c8e9
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/Transport.java
@@ -0,0 +1,65 @@
+package org.apache.haox.transport;
+
+import org.apache.haox.event.Dispatcher;
+import org.apache.haox.transport.buffer.TransBuffer;
+import org.apache.haox.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+public abstract class Transport {
+ private InetSocketAddress remoteAddress;
+ protected Dispatcher dispatcher;
+ private Object attachment;
+
+ protected TransBuffer sendBuffer;
+
+ private int readableCount = 0;
+ private int writableCount = 0;
+
+ public Transport(InetSocketAddress remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ this.sendBuffer = new TransBuffer();
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public void sendMessage(ByteBuffer message) {
+ if (message != null) {
+ sendBuffer.write(message);
+ dispatcher.dispatch(TransportEvent.createWritableTransportEvent(this));
+ }
+ }
+
+ public void onWriteable() throws IOException {
+ this.writableCount ++;
+
+ if (! sendBuffer.isEmpty()) {
+ ByteBuffer message = sendBuffer.read();
+ if (message != null) {
+ sendOutMessage(message);
+ }
+ }
+ }
+
+ public void onReadable() throws IOException {
+ this.readableCount++;
+ }
+
+ protected abstract void sendOutMessage(ByteBuffer message) throws IOException;
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportHandler.java
new file mode 100644
index 0000000..957360b
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportHandler.java
@@ -0,0 +1,15 @@
+package org.apache.haox.transport;
+
+import org.apache.haox.event.AbstractEventHandler;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Handling readable and writable events
+ */
+public abstract class TransportHandler extends AbstractEventHandler {
+
+ public abstract void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportSelector.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportSelector.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportSelector.java
new file mode 100644
index 0000000..dbf1678
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/TransportSelector.java
@@ -0,0 +1,81 @@
+package org.apache.haox.transport;
+
+import org.apache.haox.event.Dispatcher;
+import org.apache.haox.event.LongRunningEventHandler;
+import org.apache.haox.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+public abstract class TransportSelector extends LongRunningEventHandler {
+
+ protected Selector selector;
+ protected TransportHandler transportHandler;
+
+ public TransportSelector(TransportHandler transportHandler) {
+ super();
+ this.transportHandler = transportHandler;
+ }
+
+ @Override
+ public void setDispatcher(Dispatcher dispatcher) {
+ super.setDispatcher(dispatcher);
+ dispatcher.register(transportHandler);
+ }
+
+ @Override
+ public void init() {
+ super.init();
+
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void loopOnce() {
+ try {
+ selectOnce();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void selectOnce() throws IOException {
+ if (selector.isOpen() && selector.select(10) > 0 && selector.isOpen()) {
+ Set<SelectionKey> selectionKeys = selector.selectedKeys();
+ Iterator<SelectionKey> iterator = selectionKeys.iterator();
+ while (iterator.hasNext()) {
+ SelectionKey selectionKey = iterator.next();
+ dealKey(selectionKey);
+ iterator.remove();
+ }
+ selectionKeys.clear();
+ }
+ }
+
+ protected void dealKey(SelectionKey selectionKey) throws IOException {
+ transportHandler.helpHandleSelectionKey(selectionKey);
+ }
+
+ protected void onNewTransport(Transport transport) {
+ transport.setDispatcher(getDispatcher());
+ dispatch(TransportEvent.createNewTransportEvent(transport));
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ try {
+ selector.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferPool.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferPool.java
new file mode 100644
index 0000000..b8169c2
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferPool.java
@@ -0,0 +1,14 @@
+package org.apache.haox.transport.buffer;
+
+import java.nio.ByteBuffer;
+
+public class BufferPool {
+
+ public static ByteBuffer allocate(int len) {
+ return ByteBuffer.allocate(len);
+ }
+
+ public static void release(ByteBuffer buffer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferUtil.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferUtil.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferUtil.java
new file mode 100644
index 0000000..d9b9279
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/BufferUtil.java
@@ -0,0 +1,23 @@
+package org.apache.haox.transport.buffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+public class BufferUtil {
+
+ /**
+ * Read len bytes from src buffer
+ */
+ public static ByteBuffer read(ByteBuffer src, int len) {
+ if (len > src.remaining())
+ throw new BufferOverflowException();
+
+ ByteBuffer result = ByteBuffer.allocate(len);
+ int n = src.remaining();
+ for (int i = 0; i < n; i++) {
+ result.put(src.get());
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/RecvBuffer.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/RecvBuffer.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/RecvBuffer.java
new file mode 100644
index 0000000..e12b9df
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/RecvBuffer.java
@@ -0,0 +1,136 @@
+package org.apache.haox.transport.buffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+public class RecvBuffer {
+
+ private LinkedList<ByteBuffer> bufferQueue;
+
+ public RecvBuffer() {
+ bufferQueue = new LinkedList<ByteBuffer>();
+ }
+
+ public synchronized void write(ByteBuffer buffer) {
+ bufferQueue.addLast(buffer);
+ }
+
+ /**
+ * Put buffer as the first into the buffer queue
+ */
+ public synchronized void writeFirst(ByteBuffer buffer) {
+ bufferQueue.addFirst(buffer);
+ }
+
+ /**
+ * Read and return the first buffer if available
+ */
+ public synchronized ByteBuffer readFirst() {
+ if (! bufferQueue.isEmpty()) {
+ return bufferQueue.removeFirst();
+ }
+ return null;
+ }
+
+ /**
+ * Read most available bytes into the dst buffer
+ */
+ public synchronized ByteBuffer readMostBytes() {
+ int len = remaining();
+ return readBytes(len);
+ }
+
+ /**
+ * Read len bytes into the dst buffer if available
+ */
+ public synchronized ByteBuffer readBytes(int len) {
+ if (remaining() < len) { // no enough data that's available
+ throw new BufferOverflowException();
+ }
+
+ ByteBuffer result = null;
+
+ ByteBuffer takenBuffer;
+ if (bufferQueue.size() == 1) {
+ takenBuffer = bufferQueue.removeFirst();
+
+ if (takenBuffer.remaining() == len) {
+ return takenBuffer;
+ }
+
+ result = BufferPool.allocate(len);
+ for (int i = 0; i < len; i++) {
+ result.put(takenBuffer.get());
+ }
+ // Has left bytes so put it back for future reading
+ if (takenBuffer.remaining() > 0) {
+ bufferQueue.addFirst(takenBuffer);
+ }
+ } else {
+ result = BufferPool.allocate(len);
+
+ Iterator<ByteBuffer> iter = bufferQueue.iterator();
+ int alreadyGot = 0, toGet;
+ while (iter.hasNext()) {
+ takenBuffer = iter.next();
+ iter.remove();
+
+ toGet = takenBuffer.remaining() < len - alreadyGot ?
+ takenBuffer.remaining() : len -alreadyGot;
+ byte[] toGetBytes = new byte[toGet];
+ takenBuffer.get(toGetBytes);
+ result.put(toGetBytes);
+
+ if (takenBuffer.remaining() > 0) {
+ bufferQueue.addFirst(takenBuffer);
+ }
+
+ alreadyGot += toGet;
+ if (alreadyGot == len) {
+ break;
+ }
+ }
+ }
+ result.flip();
+
+ return result;
+ }
+
+ public boolean isEmpty() {
+ return bufferQueue.isEmpty();
+ }
+
+ /**
+ * Return count of remaining and left bytes that's available
+ */
+ public int remaining() {
+ if (bufferQueue.isEmpty()) {
+ return 0;
+ } else if (bufferQueue.size() == 1) {
+ return bufferQueue.getFirst().remaining();
+ }
+
+ int result = 0;
+ Iterator<ByteBuffer> iter = bufferQueue.iterator();
+ while (iter.hasNext()) {
+ result += iter.next().remaining();
+ }
+ return result;
+ }
+
+ public synchronized void clear() {
+ if (bufferQueue.isEmpty()) {
+ return;
+ } else if (bufferQueue.size() == 1) {
+ BufferPool.release(bufferQueue.getFirst());
+ }
+
+ Iterator<ByteBuffer> iter = bufferQueue.iterator();
+ while (iter.hasNext()) {
+ BufferPool.release(iter.next());
+ }
+ bufferQueue.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/TransBuffer.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/TransBuffer.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/TransBuffer.java
new file mode 100644
index 0000000..869fe59
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/buffer/TransBuffer.java
@@ -0,0 +1,30 @@
+package org.apache.haox.transport.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class TransBuffer {
+
+ private BlockingQueue<ByteBuffer> bufferQueue;
+
+ public TransBuffer() {
+ bufferQueue = new ArrayBlockingQueue<ByteBuffer>(2);
+ }
+
+ public void write(ByteBuffer buffer) {
+ bufferQueue.add(buffer);
+ }
+
+ public void write(byte[] buffer) {
+ write(ByteBuffer.wrap(buffer));
+ }
+
+ public ByteBuffer read() {
+ return bufferQueue.poll();
+ }
+
+ public boolean isEmpty() {
+ return bufferQueue.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/event/AddressEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/event/AddressEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/AddressEvent.java
new file mode 100644
index 0000000..920b603
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/AddressEvent.java
@@ -0,0 +1,20 @@
+package org.apache.haox.transport.event;
+
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+
+import java.net.InetSocketAddress;
+
+public class AddressEvent extends Event {
+
+ private InetSocketAddress address;
+
+ public AddressEvent(InetSocketAddress address, EventType eventType) {
+ super(eventType);
+ this.address = address;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/event/MessageEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/event/MessageEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/MessageEvent.java
new file mode 100644
index 0000000..736e4ba
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/MessageEvent.java
@@ -0,0 +1,22 @@
+package org.apache.haox.transport.event;
+
+import org.apache.haox.transport.Transport;
+
+import java.nio.ByteBuffer;
+
+public class MessageEvent extends TransportEvent {
+
+ private MessageEvent(Transport transport, ByteBuffer message) {
+ super(transport, TransportEventType.INBOUND_MESSAGE, message);
+ }
+
+ public ByteBuffer getMessage() {
+ return (ByteBuffer) getEventData();
+ }
+
+ public static MessageEvent createInboundMessageEvent(
+ Transport transport, ByteBuffer message) {
+ return new MessageEvent(transport, message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEvent.java
new file mode 100644
index 0000000..68882fb
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEvent.java
@@ -0,0 +1,37 @@
+package org.apache.haox.transport.event;
+
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Transport;
+
+public class TransportEvent extends Event {
+
+ private Transport transport;
+
+ public TransportEvent(Transport transport, EventType eventType) {
+ super(eventType);
+ this.transport = transport;
+ }
+
+ public TransportEvent(Transport transport, EventType eventType, Object eventData) {
+ super(eventType, eventData);
+ this.transport = transport;
+ }
+
+ public Transport getTransport() {
+ return transport;
+ }
+
+ public static TransportEvent createWritableTransportEvent(Transport transport) {
+ return new TransportEvent(transport, TransportEventType.TRANSPORT_WRITABLE);
+ }
+
+ public static TransportEvent createReadableTransportEvent(Transport transport) {
+ return new TransportEvent(transport, TransportEventType.TRANSPORT_READABLE);
+ }
+
+ public static TransportEvent createNewTransportEvent(Transport transport) {
+ return new TransportEvent(transport, TransportEventType.NEW_TRANSPORT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEventType.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEventType.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEventType.java
new file mode 100644
index 0000000..457bf25
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/event/TransportEventType.java
@@ -0,0 +1,10 @@
+package org.apache.haox.transport.event;
+
+import org.apache.haox.event.EventType;
+
+public enum TransportEventType implements EventType {
+ NEW_TRANSPORT,
+ TRANSPORT_WRITABLE,
+ TRANSPORT_READABLE,
+ INBOUND_MESSAGE
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/DecodingCallback.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/DecodingCallback.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/DecodingCallback.java
new file mode 100644
index 0000000..b656159
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/DecodingCallback.java
@@ -0,0 +1,19 @@
+package org.apache.haox.transport.tcp;
+
+public interface DecodingCallback {
+
+ /**
+ * OK, enough data is ready, a message can be out
+ */
+ public void onMessageComplete(int messageLength);
+
+ /**
+ * Need more data to be available
+ */
+ public void onMoreDataNeeded();
+
+ /**
+ * Need more data to be available, with determined more data length given
+ */
+ public void onMoreDataNeeded(int needDataLength);
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/StreamingDecoder.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/StreamingDecoder.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/StreamingDecoder.java
new file mode 100644
index 0000000..2a90b94
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/StreamingDecoder.java
@@ -0,0 +1,7 @@
+package org.apache.haox.transport.tcp;
+
+import java.nio.ByteBuffer;
+
+public interface StreamingDecoder {
+ public void decode(ByteBuffer streamingBuffer, DecodingCallback callback);
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAcceptor.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAcceptor.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAcceptor.java
new file mode 100644
index 0000000..011f67b
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAcceptor.java
@@ -0,0 +1,92 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Acceptor;
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+public class TcpAcceptor extends Acceptor {
+
+ public TcpAcceptor(StreamingDecoder streamingDecoder) {
+ this(new TcpTransportHandler(streamingDecoder));
+ }
+
+ public TcpAcceptor(TcpTransportHandler transportHandler) {
+ super(transportHandler);
+
+ setEventHandler(new AbstractEventHandler() {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ if (event.getEventType() == TcpEventType.ADDRESS_BIND) {
+ try {
+ doBind((AddressEvent) event);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] {
+ TcpEventType.ADDRESS_BIND
+ };
+ }
+ });
+ }
+
+ @Override
+ protected void doListen(InetSocketAddress socketAddress) {
+ AddressEvent event = TcpAddressEvent.createAddressBindEvent(socketAddress);
+ dispatch(event);
+ }
+
+ @Override
+ protected void dealKey(SelectionKey selectionKey) throws IOException {
+ if (selectionKey.isAcceptable()) {
+ doAccept(selectionKey);
+ } else {
+ super.dealKey(selectionKey);
+ }
+ }
+
+ void doAccept(SelectionKey key) throws IOException {
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ SocketChannel channel;
+ while ((channel = server.accept()) != null) {
+ // Quick fix: avoid exception during exiting
+ if (! selector.isOpen()) {
+ channel.close();
+ break;
+ };
+
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(true);
+ channel.socket().setKeepAlive(true);
+
+ Transport transport = new TcpTransport(channel,
+ ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+ channel.register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+ onNewTransport(transport);
+ }
+ }
+
+ protected void doBind(AddressEvent event) throws IOException {
+ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(event.getAddress());
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAddressEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAddressEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAddressEvent.java
new file mode 100644
index 0000000..23f3d31
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpAddressEvent.java
@@ -0,0 +1,17 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class TcpAddressEvent {
+
+ public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+ return new AddressEvent(address, TcpEventType.ADDRESS_BIND);
+ }
+
+ public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+ return new AddressEvent(address, TcpEventType.ADDRESS_CONNECT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpConnector.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpConnector.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpConnector.java
new file mode 100644
index 0000000..13a0713
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpConnector.java
@@ -0,0 +1,75 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Connector;
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class TcpConnector extends Connector {
+
+ public TcpConnector(StreamingDecoder streamingDecoder) {
+ this(new TcpTransportHandler(streamingDecoder));
+ }
+
+ public TcpConnector(TcpTransportHandler transportHandler) {
+ super(transportHandler);
+
+ setEventHandler(new AbstractEventHandler() {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ if (event.getEventType() == TcpEventType.ADDRESS_CONNECT) {
+ doConnect((AddressEvent) event);
+ }
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] {
+ TcpEventType.ADDRESS_CONNECT
+ };
+ }
+ });
+ }
+
+ @Override
+ protected void doConnect(InetSocketAddress sa) {
+ AddressEvent event = TcpAddressEvent.createAddressConnectEvent(sa);
+ dispatch(event);
+ }
+
+ private void doConnect(AddressEvent event) throws IOException {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(event.getAddress());
+ channel.register(selector,
+ SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ protected void dealKey(SelectionKey selectionKey) throws IOException {
+ if (selectionKey.isConnectable()) {
+ doConnect(selectionKey);
+ } else {
+ super.dealKey(selectionKey);
+ }
+ }
+
+ void doConnect(SelectionKey key) throws IOException {
+ SocketChannel channel = (SocketChannel) key.channel();
+ if (channel.isConnectionPending()) {
+ channel.finishConnect();
+ }
+
+ Transport transport = new TcpTransport(channel,
+ ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+ channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+ onNewTransport(transport);
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpEventType.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpEventType.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpEventType.java
new file mode 100644
index 0000000..2710ddb
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpEventType.java
@@ -0,0 +1,8 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.event.EventType;
+
+public enum TcpEventType implements EventType {
+ ADDRESS_BIND,
+ ADDRESS_CONNECT
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransport.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransport.java
new file mode 100644
index 0000000..03ec89c
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransport.java
@@ -0,0 +1,91 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.buffer.BufferPool;
+import org.apache.haox.transport.buffer.RecvBuffer;
+import org.apache.haox.transport.event.MessageEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public class TcpTransport extends Transport {
+
+ private SocketChannel channel;
+
+ private StreamingDecoder streamingDecoder;
+
+ private RecvBuffer recvBuffer;
+
+ public TcpTransport(SocketChannel channel,
+ StreamingDecoder streamingDecoder) throws IOException {
+ super((InetSocketAddress) channel.getRemoteAddress());
+ this.channel = channel;
+ this.streamingDecoder = streamingDecoder;
+
+ this.recvBuffer = new RecvBuffer();
+ }
+
+ @Override
+ protected void sendOutMessage(ByteBuffer message) throws IOException {
+ channel.write(message);
+ }
+
+ public void onReadable() throws IOException {
+ ByteBuffer writeBuffer = BufferPool.allocate(65536);
+ if (channel.read(writeBuffer) <= 0) {
+ BufferPool.release(writeBuffer);
+ return;
+ }
+
+ writeBuffer.flip();
+ recvBuffer.write(writeBuffer);
+
+ WithReadDataHander rdHandler = new WithReadDataHander();
+ rdHandler.handle();
+ }
+
+ class WithReadDataHander implements DecodingCallback {
+ private ByteBuffer streamingBuffer;
+
+ @Override
+ public void onMessageComplete(int messageLength) {
+ ByteBuffer message = null;
+
+ int remaining = streamingBuffer.remaining();
+ if (remaining == messageLength) {
+ message = streamingBuffer;
+ } else if (remaining > messageLength) {
+ message = streamingBuffer.duplicate();
+ int newLimit = streamingBuffer.position() + messageLength;
+ message.limit(newLimit);
+
+ streamingBuffer.position(newLimit);
+ recvBuffer.writeFirst(streamingBuffer);
+ }
+
+ if (message != null) {
+ dispatcher.dispatch(MessageEvent.createInboundMessageEvent(TcpTransport.this, message));
+ }
+ }
+
+ @Override
+ public void onMoreDataNeeded() {
+ recvBuffer.writeFirst(streamingBuffer);
+ }
+
+ @Override
+ public void onMoreDataNeeded(int needDataLength) {
+ recvBuffer.writeFirst(streamingBuffer);
+ }
+
+ public void handle() {
+ if (recvBuffer.isEmpty()) return;
+
+ streamingBuffer = recvBuffer.readMostBytes();
+
+ streamingDecoder.decode(streamingBuffer.duplicate(), this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransportHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransportHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransportHandler.java
new file mode 100644
index 0000000..b8728b1
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/tcp/TcpTransportHandler.java
@@ -0,0 +1,58 @@
+package org.apache.haox.transport.tcp;
+
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.event.TransportEventType;
+import org.apache.haox.transport.TransportHandler;
+import org.apache.haox.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public class TcpTransportHandler extends TransportHandler {
+
+ private StreamingDecoder streamingDecoder;
+
+ public TcpTransportHandler(StreamingDecoder streamingDecoder) {
+ this.streamingDecoder = streamingDecoder;
+ }
+
+ public StreamingDecoder getStreamingDecoder() {
+ return streamingDecoder;
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new TransportEventType[] {
+ TransportEventType.TRANSPORT_READABLE,
+ TransportEventType.TRANSPORT_WRITABLE
+ };
+ }
+
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ EventType eventType = event.getEventType();
+ TransportEvent te = (TransportEvent) event;
+ Transport transport = te.getTransport();
+ if (eventType == TransportEventType.TRANSPORT_READABLE) {
+ transport.onReadable();
+ } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+ transport.onWriteable();
+ }
+ }
+
+ @Override
+ public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+ if (selectionKey.isReadable()) {
+ selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ TcpTransport transport = (TcpTransport) selectionKey.attachment();
+ dispatch(TransportEvent.createReadableTransportEvent(transport));
+ } else if (selectionKey.isWritable()) {
+ selectionKey.interestOps(SelectionKey.OP_READ);
+ TcpTransport transport = (TcpTransport) selectionKey.attachment();
+ dispatch(TransportEvent.createWritableTransportEvent(transport));
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAcceptor.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAcceptor.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAcceptor.java
new file mode 100644
index 0000000..e666edb
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAcceptor.java
@@ -0,0 +1,65 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Acceptor;
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpAcceptor extends Acceptor {
+
+ private DatagramChannel serverChannel;
+
+ public UdpAcceptor() {
+ this(new UdpTransportHandler());
+ }
+
+ public UdpAcceptor(UdpTransportHandler udpTransportHandler) {
+ super(udpTransportHandler);
+
+ setEventHandler(new AbstractEventHandler() {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ if (event.getEventType() == UdpEventType.ADDRESS_BIND) {
+ doBind((AddressEvent) event);
+ }
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] {
+ UdpEventType.ADDRESS_BIND
+ };
+ }
+ });
+ }
+
+ @Override
+ protected void doListen(InetSocketAddress socketAddress) {
+ AddressEvent event = UdpAddressEvent.createAddressBindEvent(socketAddress);
+ dispatch(event);
+ }
+
+ private void doBind(AddressEvent event) throws IOException {
+ serverChannel = DatagramChannel.open();
+ serverChannel.configureBlocking(false);
+ serverChannel.bind(event.getAddress());
+ serverChannel.register(selector, SelectionKey.OP_READ);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ try {
+ serverChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAddressEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAddressEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAddressEvent.java
new file mode 100644
index 0000000..83459f5
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpAddressEvent.java
@@ -0,0 +1,17 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class UdpAddressEvent {
+
+ public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+ return new AddressEvent(address, UdpEventType.ADDRESS_BIND);
+ }
+
+ public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+ return new AddressEvent(address, UdpEventType.ADDRESS_CONNECT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpChannelEvent.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpChannelEvent.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpChannelEvent.java
new file mode 100644
index 0000000..ace00d6
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpChannelEvent.java
@@ -0,0 +1,28 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+
+import java.nio.channels.DatagramChannel;
+
+public class UdpChannelEvent extends Event {
+
+ private DatagramChannel channel;
+
+ private UdpChannelEvent(DatagramChannel channel, EventType eventType) {
+ super(eventType);
+ this.channel = channel;
+ }
+
+ public DatagramChannel getChannel() {
+ return channel;
+ }
+
+ public static UdpChannelEvent makeWritableChannelEvent(DatagramChannel channel) {
+ return new UdpChannelEvent(channel, UdpEventType.CHANNEL_WRITABLE);
+ }
+
+ public static UdpChannelEvent makeReadableChannelEvent(DatagramChannel channel) {
+ return new UdpChannelEvent(channel, UdpEventType.CHANNEL_READABLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpConnector.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpConnector.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpConnector.java
new file mode 100644
index 0000000..c3419a9
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpConnector.java
@@ -0,0 +1,57 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.event.AbstractEventHandler;
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Connector;
+import org.apache.haox.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpConnector extends Connector {
+
+ public UdpConnector() {
+ this(new UdpTransportHandler());
+ }
+
+ public UdpConnector(UdpTransportHandler transportHandler) {
+ super(transportHandler);
+
+ setEventHandler(new AbstractEventHandler() {
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ if (event.getEventType() == UdpEventType.ADDRESS_CONNECT) {
+ doConnect((AddressEvent) event);
+ }
+ }
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] {
+ UdpEventType.ADDRESS_CONNECT
+ };
+ }
+ });
+ }
+
+ @Override
+ protected void doConnect(InetSocketAddress sa) {
+ AddressEvent event = UdpAddressEvent.createAddressConnectEvent(sa);
+ dispatch(event);
+ }
+
+ private void doConnect(AddressEvent event) throws IOException {
+ InetSocketAddress address = event.getAddress();
+ DatagramChannel channel = DatagramChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(address);
+
+ channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+
+ UdpTransport transport = new UdpTransport(channel, address);
+ onNewTransport(transport);
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpEventType.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpEventType.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpEventType.java
new file mode 100644
index 0000000..c8f07b2
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpEventType.java
@@ -0,0 +1,10 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.event.EventType;
+
+public enum UdpEventType implements EventType {
+ ADDRESS_BIND,
+ ADDRESS_CONNECT,
+ CHANNEL_WRITABLE,
+ CHANNEL_READABLE
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransport.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransport.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransport.java
new file mode 100644
index 0000000..78007a4
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransport.java
@@ -0,0 +1,46 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.buffer.TransBuffer;
+import org.apache.haox.transport.event.MessageEvent;
+import org.apache.haox.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+public class UdpTransport extends Transport {
+ private DatagramChannel channel;
+
+ protected TransBuffer recvBuffer;
+
+ public UdpTransport(DatagramChannel channel,
+ InetSocketAddress remoteAddress) {
+ super(remoteAddress);
+ this.channel = channel;
+ this.recvBuffer = new TransBuffer();
+ }
+
+ protected void onRecvData(ByteBuffer data) {
+ if (data != null) {
+ recvBuffer.write(data);
+ dispatcher.dispatch(TransportEvent.createReadableTransportEvent(this));
+ }
+ }
+
+ @Override
+ public void onReadable() throws IOException {
+ super.onReadable();
+
+ if (! recvBuffer.isEmpty()) {
+ ByteBuffer message = recvBuffer.read();
+ dispatcher.dispatch(MessageEvent.createInboundMessageEvent(this, message));
+ }
+ }
+
+ @Override
+ protected void sendOutMessage(ByteBuffer message) throws IOException {
+ channel.send(message, getRemoteAddress());
+ }
+}
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransportHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransportHandler.java b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransportHandler.java
new file mode 100644
index 0000000..1c296ab
--- /dev/null
+++ b/contrib/haox-event/src/main/java/org/apache/haox/transport/udp/UdpTransportHandler.java
@@ -0,0 +1,90 @@
+package org.apache.haox.transport.udp;
+
+import org.apache.haox.event.Event;
+import org.apache.haox.event.EventType;
+import org.apache.haox.transport.Transport;
+import org.apache.haox.transport.TransportHandler;
+import org.apache.haox.transport.event.TransportEvent;
+import org.apache.haox.transport.event.TransportEventType;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+public class UdpTransportHandler extends TransportHandler {
+
+ protected Map<InetSocketAddress, UdpTransport> transports =
+ new HashMap<InetSocketAddress, UdpTransport>();
+
+ @Override
+ public EventType[] getInterestedEvents() {
+ return new EventType[] {
+ UdpEventType.CHANNEL_READABLE,
+ TransportEventType.TRANSPORT_WRITABLE,
+ TransportEventType.TRANSPORT_READABLE,
+ TransportEventType.NEW_TRANSPORT
+ };
+ }
+
+ @Override
+ protected void doHandle(Event event) throws Exception {
+ EventType eventType = event.getEventType();
+ if (eventType == UdpEventType.CHANNEL_READABLE) {
+ UdpChannelEvent ce = (UdpChannelEvent) event;
+ DatagramChannel channel = ce.getChannel();
+ doRead(channel);
+ } else if (eventType == TransportEventType.TRANSPORT_READABLE) {
+ TransportEvent te = (TransportEvent) event;
+ Transport transport = te.getTransport();
+ transport.onReadable();
+ } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+ TransportEvent te = (TransportEvent) event;
+ Transport transport = te.getTransport();
+ transport.onWriteable();
+ } else if (eventType == TransportEventType.NEW_TRANSPORT) {
+ TransportEvent te = (TransportEvent) event;
+ Transport transport = te.getTransport();
+ if (transport instanceof UdpTransport) {
+ InetSocketAddress remoteAddress = transport.getRemoteAddress();
+ if (! transports.containsKey(remoteAddress)) {
+ transports.put(remoteAddress, (UdpTransport) transport);
+ }
+ }
+ }
+ }
+
+ private void doRead(DatagramChannel channel) throws IOException {
+ ByteBuffer recvBuffer = ByteBuffer.allocate(65536); // to optimize
+ InetSocketAddress fromAddress = (InetSocketAddress) channel.receive(recvBuffer);
+ if (fromAddress != null) {
+ recvBuffer.flip();
+ UdpTransport transport = transports.get(fromAddress);
+ if (transport == null) {
+ // should be from acceptor
+ transport = new UdpTransport(channel, fromAddress);
+ transport.setDispatcher(getDispatcher());
+ dispatch(TransportEvent.createNewTransportEvent(transport));
+ }
+ transport.onRecvData(recvBuffer);
+ }
+ }
+
+ @Override
+ public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+ DatagramChannel channel =
+ (DatagramChannel) selectionKey.channel();
+
+ if (selectionKey.isReadable()) {
+ dispatch(UdpChannelEvent.makeReadableChannelEvent(channel));
+ } else if (selectionKey.isWritable()) {
+ dispatch(UdpChannelEvent.makeWritableChannelEvent(channel));
+ }
+ // Udp channel is always writable, so not usable
+ selectionKey.interestOps(SelectionKey.OP_READ);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/haox/event/AbstractEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/haox/event/AbstractEventHandler.java b/contrib/haox-event/src/main/java/org/haox/event/AbstractEventHandler.java
deleted file mode 100644
index 2a63d30..0000000
--- a/contrib/haox-event/src/main/java/org/haox/event/AbstractEventHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.haox.event;
-
-public abstract class AbstractEventHandler implements EventHandler {
-
- private Dispatcher dispatcher;
-
- public AbstractEventHandler() {
-
- }
-
- protected void dispatch(Event event) {
- dispatcher.dispatch(event);
- }
-
- @Override
- public Dispatcher getDispatcher() {
- return dispatcher;
- }
-
- @Override
- public void setDispatcher(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- @Override
- public void handle(Event event) {
- try {
- doHandle(event);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected abstract void doHandle(Event event) throws Exception;
-}
-
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/haox/event/AbstractInternalEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/haox/event/AbstractInternalEventHandler.java b/contrib/haox-event/src/main/java/org/haox/event/AbstractInternalEventHandler.java
deleted file mode 100644
index 8f3d7cc..0000000
--- a/contrib/haox-event/src/main/java/org/haox/event/AbstractInternalEventHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.haox.event;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public abstract class AbstractInternalEventHandler extends AbstractEventHandler
- implements InternalEventHandler {
-
- private int id = -1;
- protected EventHandler handler;
-
- private static AtomicInteger idGen = new AtomicInteger(1);
-
- public AbstractInternalEventHandler() {
- super();
-
- this.id = idGen.getAndIncrement();
-
- init();
- }
-
- public AbstractInternalEventHandler(EventHandler handler) {
- this();
-
- this.handler = handler;
- }
-
- protected void setEventHandler(EventHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public int id() {
- return id;
- }
-
- public abstract void init();
-
- protected void process(Event event) {
- handler.handle(event);
- }
-
- @Override
- public EventType[] getInterestedEvents() {
- return handler.getInterestedEvents();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/haox/event/BufferedEventHandler.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/haox/event/BufferedEventHandler.java b/contrib/haox-event/src/main/java/org/haox/event/BufferedEventHandler.java
deleted file mode 100644
index d38f9ef..0000000
--- a/contrib/haox-event/src/main/java/org/haox/event/BufferedEventHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.haox.event;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * An EventHandler wrapper buffering events and processing them later
- */
-public abstract class BufferedEventHandler extends AbstractInternalEventHandler {
-
- protected BlockingQueue<Event> eventQueue;
-
- public BufferedEventHandler(EventHandler handler) {
- super(handler);
- }
-
- public BufferedEventHandler() {
- super();
- }
-
- @Override
- public void init() {
- this.eventQueue = new ArrayBlockingQueue<Event>(2);
- }
-
- @Override
- protected void doHandle(Event event) throws Exception {
- try {
- eventQueue.put(event);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/directory-kerberos/blob/5a980a4d/contrib/haox-event/src/main/java/org/haox/event/Dispatcher.java
----------------------------------------------------------------------
diff --git a/contrib/haox-event/src/main/java/org/haox/event/Dispatcher.java b/contrib/haox-event/src/main/java/org/haox/event/Dispatcher.java
deleted file mode 100644
index 3d6797b..0000000
--- a/contrib/haox-event/src/main/java/org/haox/event/Dispatcher.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.haox.event;
-
-public interface Dispatcher {
-
- public void dispatch(Event event);
-
- public void register(EventHandler handler);
-
- public void register(InternalEventHandler internalHandler);
-}