You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/12/08 16:40:32 UTC

svn commit: r111280 - in incubator/directory/seda/branches/trustin/src: examples/org/apache/mina/examples/echoserver examples/org/apache/mina/examples/netcat java/org/apache/mina/core java/org/apache/mina/core/socket java/org/apache/mina/protocol java/org/apache/mina/util

Author: trustin
Date: Wed Dec  8 07:40:31 2004
New Revision: 111280

URL: http://svn.apache.org/viewcvs?view=rev&rev=111280
Log:
Added WriteBuffer.putMarker() and SessionHandler.markerReleased().
Once a marker is put, it is released when all data put into write buffer before the marker is put is flushed to the channel.
I used this techinique to implement messageSent event more accurately and it works great.
Modified:
   incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
   incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java

Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/echoserver/EchoServerSessionHandler.java	Wed Dec  8 07:40:31 2004
@@ -77,4 +77,7 @@
             rb.signal();
         }
     }
+
+	public void markerReleased(Session session, Object marker) {
+	}
 }

Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/mina/examples/netcat/NetCatSessionHandler.java	Wed Dec  8 07:40:31 2004
@@ -59,4 +59,7 @@
 
     public void dataWritten(Session session, int writtenBytes) {
     }
+
+	public void markerReleased(Session session, Object marker) {
+	}
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/SessionHandler.java	Wed Dec  8 07:40:31 2004
@@ -39,4 +39,6 @@
     void dataRead(Session session, int readBytes);
 
     void dataWritten(Session session, int writtenBytes);
+    
+    void markerReleased(Session session, Object marker);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/WriteBuffer.java	Wed Dec  8 07:40:31 2004
@@ -71,4 +71,6 @@
     ByteBuffer asByteBuffer();
 
     WriteBuffer flush();
+    
+    WriteBuffer putMarker(Object marker);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpIoProcessor.java	Wed Dec  8 07:40:31 2004
@@ -20,12 +20,10 @@
 package org.apache.mina.core.socket;
 
 import java.io.IOException;
-
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-
 import java.util.Iterator;
 import java.util.Set;
 
@@ -342,6 +340,26 @@
                 if (writtenBytes > 0) {
                     session.increaseWrittenBytes(writtenBytes);
                     fireDataWritten(session, writtenBytes);
+                    Queue markers = lock.getMarkers();
+                    for (;;) {
+                    	TcpWriteBuffer.Marker marker = (TcpWriteBuffer.Marker) markers.first();
+                    	if (marker == null)
+                    		break;
+
+                    	int bytesLeft = marker.getBytesLeft();
+                    	if (bytesLeft > writtenBytes) {
+                    		marker.setBytesLeft(bytesLeft - writtenBytes);
+                    		break;
+                    	} else if (bytesLeft == writtenBytes) {
+                    		markers.pop();
+                    		fireMarkerRemoved(session, marker.getValue());
+                    		break;
+                    	} else {
+                    		markers.pop();
+                    		fireMarkerRemoved(session, marker.getValue());
+                    		writtenBytes -= bytesLeft;
+                    	}
+                    }
                 }
             }
         } catch (IOException e) {
@@ -384,6 +402,14 @@
     private void fireDataWritten(TcpSession session, int writtenBytes) {
         try {
             session.getHandler().dataWritten(session, writtenBytes);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireMarkerRemoved(TcpSession session, Object marker) {
+        try {
+            session.getHandler().markerReleased(session, marker);
         } catch (Throwable e) {
             fireExceptionCaught(session, e);
         }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpSession.java	Wed Dec  8 07:40:31 2004
@@ -111,7 +111,7 @@
     void flush() {
         TcpIoProcessor.getInstance().flushSession(this);
     }
-
+    
     public boolean isConnected() {
         return ch.isConnected();
     }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/core/socket/TcpWriteBuffer.java	Wed Dec  8 07:40:31 2004
@@ -24,6 +24,7 @@
 
 import org.apache.mina.core.ReadBuffer;
 import org.apache.mina.core.WriteBuffer;
+import org.apache.mina.util.Queue;
 
 
 /**
@@ -35,6 +36,7 @@
 class TcpWriteBuffer implements WriteBuffer {
     private final TcpSession session;
     private final ByteBuffer buf;
+    private final Queue markers = new Queue(16);
 
     TcpWriteBuffer(TcpSession session, ByteBuffer buf) {
         this.session = session;
@@ -122,7 +124,7 @@
         session.flush();
         return this;
     }
-
+    
     public boolean hasRemaining() {
         return buf.hasRemaining();
     }
@@ -148,5 +150,43 @@
     public WriteBuffer reset() {
         buf.reset();
         return this;
+    }
+    
+    Queue getMarkers() {
+    	return markers;
+    }
+    
+    public WriteBuffer putMarker(Object marker) {
+    	int bytesLeft;
+    	if (markers.isEmpty()) {
+    		bytesLeft = buf.position();
+    	} else {
+    		bytesLeft = buf.position() - ((Marker) markers.last()).getBytesLeft();
+    	}
+    	
+    	markers.push(new Marker(marker, bytesLeft));
+    	return this;
+    }
+    
+    static class Marker {
+    	private final Object value;
+    	private int bytesLeft;
+    	
+    	private Marker(Object value, int bytesLeft) {
+    		this.value = value;
+    		this.bytesLeft = bytesLeft;
+    	}
+    	
+    	public Object getValue() {
+    		return value;
+    	}
+    	
+    	public int getBytesLeft() {
+    		return bytesLeft;
+    	}
+    	
+    	public void setBytesLeft(int bytesLeft) {
+    		this.bytesLeft = bytesLeft;
+    	}
     }
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/protocol/CoreAdapter.java	Wed Dec  8 07:40:31 2004
@@ -110,6 +110,10 @@
             write(session);
         }
 
+		public void markerReleased(Session session, Object marker) {
+			fireMessageSent((ProtocolSession) session.getAttachment(), marker);
+		}
+		
         private void write(Session session) {
             ProtocolSessionImpl psession =
                 (ProtocolSessionImpl) session.getAttachment();
@@ -124,9 +128,8 @@
                 while (!writeQueue.isEmpty()) {
                     synchronized (out) {
                         if (codec.encode(psession, writeQueue.first(), out)) {
+                        	out.putMarker(writeQueue.pop());
                             out.flush();
-                            // FIXME The message is not actually written.
-                            fireMessageSent(psession, writeQueue.pop());
                         } else {
                             out.flush();
                             break;

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java?view=diff&rev=111280&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r1=111279&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java&r2=111280
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/mina/util/Queue.java	Wed Dec  8 07:40:31 2004
@@ -118,6 +118,14 @@
 
         return items[first];
     }
+    
+    public Object last() {
+    	if (size == 0) {
+    		return null;
+    	}
+    	
+    	return items[(last + items.length - 1) % items.length];
+    }
 
     /**
      * Returns <code>true</code> if the queue is empty.