You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/12/08 13:50:57 UTC

svn commit: r111268 - incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol

Author: trustin
Date: Wed Dec  8 04:50:53 2004
New Revision: 111268

URL: http://svn.apache.org/viewcvs?view=rev&rev=111268
Log:
 * Added 'reverser' example server which demonstrates protocol layer.
 * Implemented protocol layer (see FIXME)
Added:
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java   (contents, props changed)
Modified:
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java?view=auto&rev=111268
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java	Wed Dec  8 04:50:53 2004
@@ -0,0 +1,262 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.mina.protocol;
+
+import java.net.SocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.core.IdleStatus;
+import org.apache.mina.core.ReadBuffer;
+import org.apache.mina.core.Session;
+import org.apache.mina.core.SessionConfig;
+import org.apache.mina.core.SessionHandler;
+import org.apache.mina.core.WriteBuffer;
+import org.apache.mina.util.Queue;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class CoreAdapter {
+    private static final Log log = LogFactory.getLog(CoreAdapter.class);
+
+    public static SessionHandler adapt(ProtocolProvider protocolProvider) {
+        return new SessionHandlerAdapter(protocolProvider);
+    }
+
+    private static class SessionHandlerAdapter implements SessionHandler {
+        private final ProtocolCodec codec;
+        private final ProtocolSessionHandler handler;
+
+        public SessionHandlerAdapter(ProtocolProvider protocolProvider) {
+            this.codec = protocolProvider.newCodec();
+            this.handler = protocolProvider.getHandler();
+        }
+
+        public void sessionOpened(Session session) {
+            ProtocolSession psession = new ProtocolSessionImpl(session, this);
+            session.setAttachment(psession);
+            fireSessionOpened(psession);
+        }
+
+        public void sessionClosed(Session session) {
+            fireSessionClosed((ProtocolSession) session.getAttachment());
+        }
+
+        public void sessionIdle(Session session, IdleStatus status) {
+            fireSessionIdle((ProtocolSession) session.getAttachment(), status);
+        }
+
+        public void exceptionCaught(Session session, Throwable cause) {
+            fireExceptionCaught((ProtocolSession) session.getAttachment(),
+                                cause);
+        }
+
+        public void dataRead(Session session, int readBytes) {
+            ProtocolSession psession =
+                (ProtocolSession) session.getAttachment();
+            ReadBuffer in = session.getReadBuffer();
+            int sizeBefore;
+            int sizeAfter;
+            Object result;
+
+            try {
+                do {
+                    result = null;
+
+                    synchronized (in) {
+                        sizeBefore = in.remaining();
+                        result = codec.decode(psession, in);
+                        sizeAfter = in.remaining();
+                    }
+
+                    if (sizeBefore != sizeAfter) {
+                        in.signal();
+                    }
+
+                    if (result != null) {
+                        fireMessageReceived(psession, result);
+                    } else {
+                        break;
+                    }
+                } while (sizeAfter > 0);
+            } catch (Throwable t) {
+                fireExceptionCaught(psession, t);
+            }
+        }
+
+        public void dataWritten(Session session, int writtenBytes) {
+            write(session);
+        }
+
+        private void write(Session session) {
+            ProtocolSessionImpl psession =
+                (ProtocolSessionImpl) session.getAttachment();
+            Queue writeQueue = psession.writeQueue;
+
+            if (writeQueue.isEmpty())
+                return;
+
+            WriteBuffer out = session.getWriteBuffer();
+
+            try {
+                while (!writeQueue.isEmpty()) {
+                    synchronized (out) {
+                        if (codec.encode(psession, writeQueue.first(), out)) {
+                            out.flush();
+                            // FIXME The message is not actually written.
+                            fireMessageSent(psession, writeQueue.pop());
+                        } else {
+                            out.flush();
+                            break;
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                fireExceptionCaught(psession, t);
+            }
+        }
+
+        private void fireSessionOpened(ProtocolSession session) {
+            try {
+                handler.sessionOpened(session);
+            } catch (Throwable t) {
+                fireExceptionCaught(session, t);
+            }
+        }
+
+        private void fireSessionClosed(ProtocolSession session) {
+            try {
+                handler.sessionClosed(session);
+            } catch (Throwable t) {
+                fireExceptionCaught(session, t);
+            }
+        }
+
+        private void fireSessionIdle(ProtocolSession session,
+                                     IdleStatus idleStatus) {
+            try {
+                handler.sessionIdle(session, idleStatus);
+            } catch (Throwable t) {
+                fireExceptionCaught(session, t);
+            }
+        }
+
+        private void fireMessageReceived(ProtocolSession session,
+                                         Object message) {
+            try {
+                handler.messageReceived(session, message);
+            } catch (Throwable t) {
+                fireExceptionCaught(session, t);
+            }
+        }
+
+        private void fireMessageSent(ProtocolSession session, Object message) {
+            try {
+                handler.messageSent(session, message);
+            } catch (Throwable t) {
+                fireExceptionCaught(session, t);
+            }
+        }
+
+        private void fireExceptionCaught(ProtocolSession session,
+                                         Throwable cause) {
+            try {
+                handler.exceptionCaught(session, cause);
+            } catch (Throwable t) {
+                log.error("Exception from excaptionCaught.", t);
+            }
+        }
+    }
+
+    private static class ProtocolSessionImpl implements ProtocolSession {
+        private final Session session;
+        private final SessionHandlerAdapter adapter;
+        private final Queue writeQueue = new Queue(16);
+        private Object attachment;
+
+        private ProtocolSessionImpl(Session session,
+                                    SessionHandlerAdapter adapter) {
+            this.session = session;
+            this.adapter = adapter;
+        }
+
+        public void close() {
+            session.close();
+        }
+
+        public Object getAttachment() {
+            return attachment;
+        }
+
+        public void setAttachment(Object attachment) {
+            this.attachment = attachment;
+        }
+
+        public boolean write(Object message) {
+            synchronized (session.getWriteBuffer()) {
+                writeQueue.push(message);
+            }
+
+            adapter.write(session);
+            return true;
+        }
+
+        public boolean isConnected() {
+            return session.isConnected();
+        }
+
+        public boolean isClosed() {
+            return session.isClosed();
+        }
+
+        public SessionConfig getConfig() {
+            return session.getConfig();
+        }
+
+        public SocketAddress getRemoteAddress() {
+            return session.getRemoteAddress();
+        }
+
+        public SocketAddress getLocalAddress() {
+            return session.getLocalAddress();
+        }
+
+        public long getLastIoTime() {
+            return session.getLastIoTime();
+        }
+
+        public long getLastReadTime() {
+            return session.getLastReadTime();
+        }
+
+        public long getLastWriteTime() {
+            return session.getLastWriteTime();
+        }
+
+        public boolean isIdle(IdleStatus status) {
+            return session.isIdle(status);
+        }
+    }
+}

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java?view=diff&rev=111268&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java&r1=111267&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java&r2=111268
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolCodec.java	Wed Dec  8 04:50:53 2004
@@ -19,7 +19,8 @@
  */
 package org.apache.mina.protocol;
 
-import java.nio.ByteBuffer;
+import org.apache.mina.core.ReadBuffer;
+import org.apache.mina.core.WriteBuffer;
 
 
 /**
@@ -29,8 +30,9 @@
  * @version $Rev$, $Date$
  */
 public interface ProtocolCodec {
-    boolean encode(ProtocolSession session, Object message, ByteBuffer out);
+    boolean encode(ProtocolSession session, Object message, WriteBuffer out)
+           throws ProtocolViolationException;
 
-    Object decode(ProtocolSession session, ByteBuffer in)
+    Object decode(ProtocolSession session, ReadBuffer in)
            throws ProtocolViolationException;
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java?view=diff&rev=111268&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java&r1=111267&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java&r2=111268
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSession.java	Wed Dec  8 04:50:53 2004
@@ -39,9 +39,7 @@
     void setAttachment(Object attachment);
 
     boolean write(Object message);
-
-    void notifyWhenWritable();
-
+    
     boolean isConnected();
 
     boolean isClosed();

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java?view=diff&rev=111268&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java&r1=111267&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java&r2=111268
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/ProtocolSessionHandler.java	Wed Dec  8 04:50:53 2004
@@ -40,6 +40,4 @@
     void messageReceived(ProtocolSession session, Object message);
 
     void messageSent(ProtocolSession session, Object message);
-
-    void sessionWritable(ProtocolSession session);
 }