You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by bl...@apache.org on 2004/12/09 17:43:27 UTC

svn commit: r111404 - in incubator/directory/seda/branches/berin_api_proposal/src: java/org/apache/directory/seda/bufferpool java/org/apache/directory/seda/input java/org/apache/directory/seda/output test/org/apache/directory/seda/bufferpool/test test/org/apache/directory/seda/output/test

Author: bloritsch
Date: Thu Dec  9 08:43:24 2004
New Revision: 111404

URL: http://svn.apache.org/viewcvs?view=rev&rev=111404
Log:
Add some meat to the Writer stage.  Unfortunately, I can't seem to get it to emit what I expect...
Modified:
   incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java
   incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java
   incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java
   incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java
   incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java

Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java&r2=111404
==============================================================================
--- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java	(original)
+++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/bufferpool/BufferPool.java	Thu Dec  9 08:43:24 2004
@@ -59,8 +59,8 @@
     public static void putBuffer( final ByteBuffer buffer )
     {
         m_usedBuffers--;
-        assert buffer.isDirect();
-        assert KILOBYTE == buffer.capacity();
+        assert buffer.isDirect() && KILOBYTE == buffer.capacity() : "The provided buffer could not have been from this pool.";
+        buffer.clear();
         m_reserveBuffers.add( new WeakReference(buffer) );
     }
 

Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java&r2=111404
==============================================================================
--- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java	(original)
+++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/input/ReaderSource.java	Thu Dec  9 08:43:24 2004
@@ -67,6 +67,7 @@
 
                 final ByteBuffer buffer = BufferPool.getBuffer();
                 event.channel().read( buffer );
+                buffer.flip();
                 event.setBuffer( buffer );
             }
         }

Modified: incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java&r2=111404
==============================================================================
--- incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java	(original)
+++ incubator/directory/seda/branches/berin_api_proposal/src/java/org/apache/directory/seda/output/Writer.java	Thu Dec  9 08:43:24 2004
@@ -16,18 +16,19 @@
  */
 package org.apache.directory.seda.output;
 
-import org.d_haven.event.Source;
-import org.d_haven.event.Sink;
-import org.d_haven.event.SinkException;
-import org.d_haven.event.PreparedEnqueue;
 import org.apache.directory.seda.Stage;
+import org.apache.directory.seda.NetworkEvent;
+import org.apache.directory.seda.bufferpool.BufferPool;
 
 import java.nio.channels.SocketChannel;
 import java.nio.channels.Selector;
 import java.nio.channels.SelectionKey;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.LinkedList;
 
 /**
  * Created by IntelliJ IDEA. User: berin Date: Dec 8, 2004 Time: 8:29:38
@@ -36,10 +37,91 @@
 public class Writer extends Stage
 {
     private Selector m_selector;
+    private Map m_cachedEvents;
 
     public Writer() throws IOException
     {
         m_selector = Selector.open();
+        m_cachedEvents = new HashMap();
+    }
+
+    public void handleEvent(final Object event)
+    {
+        if ( event instanceof NetworkEvent )
+        {
+            final NetworkEvent netEvent = (NetworkEvent) event;
+
+            try
+            {
+                if ( m_selector.selectNow() > 0 )
+                {
+                    final Iterator it = m_selector.selectedKeys().iterator();
+
+                    while (it.hasNext())
+                    {
+                        final SelectionKey key = (SelectionKey)it.next();
+                        final SocketChannel channel = (SocketChannel) key.channel();
+                        sendCachedEvents( key, channel );
+                        sendOrCache( netEvent, channel );
+                    }
+                }
+            }
+            catch ( IOException e )
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+        else
+        {
+            super.handleEvent( event );
+        }
+    }
+
+    private void sendOrCache( final NetworkEvent netEvent,
+                              final SocketChannel channel )
+            throws IOException
+    {
+        if ( netEvent.channel().equals(channel) )
+        {
+            sendEvent( channel, netEvent );
+        }
+        else
+        {
+            Collection cached = (Collection)m_cachedEvents.get( channel );
+            if ( cached == null )
+            {
+                cached = new LinkedList();
+                m_cachedEvents.put( channel, cached );
+            }
+
+            cached.add( netEvent );
+        }
+    }
+
+    private void sendEvent( final SocketChannel channel,
+                            final NetworkEvent netEvent )
+            throws IOException
+    {
+        channel.write( netEvent.getBuffer() );
+        BufferPool.putBuffer( netEvent.getBuffer() );
+        netEvent.setBuffer(null);
+    }
+
+    private void sendCachedEvents( final SelectionKey key,
+                                   final SocketChannel channel )
+            throws IOException
+    {
+        final Collection cached = (Collection)m_cachedEvents.get(key.channel());
+
+        if (cached != null)
+        {
+            final Iterator it = cached.iterator();
+            while(it.hasNext())
+            {
+                final NetworkEvent netEvent = (NetworkEvent)it.next();
+                sendEvent( channel, netEvent );
+            }
+        }
     }
 
     public void connect( final SocketChannel client ) throws IOException

Modified: incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java&r2=111404
==============================================================================
--- incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java	(original)
+++ incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/bufferpool/test/TestBufferPool.java	Thu Dec  9 08:43:24 2004
@@ -29,6 +29,7 @@
 public class TestBufferPool extends TestCase
 {
     private static final int KB = 1024;
+    private static final int TOO_SMALL = 256;
 
     public TestBufferPool(final String name)
     {
@@ -81,5 +82,45 @@
         BufferPool.putBuffer( buffer2 );
 
         assertEquals( 1, BufferPool.buffersInReserve() );
+    }
+
+    public void testPutBuffer_notDirect()
+    {
+        boolean assertionsEnabled = false;
+        assert assertionsEnabled = true;
+
+        if (assertionsEnabled)
+        {
+            try
+            {
+                BufferPool.putBuffer( ByteBuffer.allocate( KB ) );
+                fail("Did not throw AssertionError as expected.");
+            }
+            catch (Error e)
+            {
+                assertTrue("Caught error, but not the expected type" + e, e instanceof AssertionError);
+                if ( ! (e instanceof AssertionError ) ) throw e;
+            }
+        }
+    }
+
+    public void testPutBuffer_tooSmall()
+    {
+        boolean assertionsEnabled = false;
+        assert assertionsEnabled = true;
+
+        if (assertionsEnabled)
+        {
+            try
+            {
+                BufferPool.putBuffer( ByteBuffer.allocateDirect( TOO_SMALL ) );
+                fail("Did not throw AssertionError as expected.");
+            }
+            catch (Error e)
+            {
+                assertTrue("Caught error, but not the expected type" + e, e instanceof AssertionError);
+                if ( ! (e instanceof AssertionError ) ) throw e;
+            }
+        }
     }
 }

Modified: incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java?view=diff&rev=111404&p1=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java&r1=111403&p2=incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java&r2=111404
==============================================================================
--- incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java	(original)
+++ incubator/directory/seda/branches/berin_api_proposal/src/test/org/apache/directory/seda/output/test/TestWriter.java	Thu Dec  9 08:43:24 2004
@@ -18,8 +18,15 @@
 
 import junit.framework.TestCase;
 import org.apache.directory.seda.output.Writer;
+import org.apache.directory.seda.NetworkEvent;
+import org.apache.directory.seda.bufferpool.BufferPool;
 
 import java.io.IOException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
 
 /**
  * Created by IntelliJ IDEA. User: berin Date: Dec 9, 2004 Time:
@@ -28,15 +35,73 @@
  */
 public class TestWriter extends TestCase
 {
+    private ServerSocketChannel m_serverChannel;
+    private static final int PORT = 6666;
+    private SocketChannel m_channel;
+    private SocketChannel m_clientChannel;
+    private Writer m_writer;
+
     public TestWriter(final String name)
     {
         super(name);
     }
 
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        m_serverChannel = ServerSocketChannel.open();
+        m_serverChannel.socket().bind( new InetSocketAddress(PORT) );
+
+        m_channel = SocketChannel.open();
+        m_channel.configureBlocking( false );
+        m_channel.connect(  new InetSocketAddress(InetAddress.getLocalHost(), PORT) );
+
+        m_clientChannel = m_serverChannel.accept();
+        m_channel.finishConnect();
+        m_channel.configureBlocking( true );
+
+        m_writer = new Writer();
+        m_writer.connect( m_clientChannel );
+    }
+
+    public void tearDown() throws Exception
+    {
+        super.tearDown();
+
+        m_writer.disconnect(m_clientChannel);
+        m_writer.close();
+
+        m_clientChannel.close();
+        m_serverChannel.close();
+        m_channel.close();
+    }
+
     public void testCreate() throws IOException
     {
         final Writer writer = new Writer();
 
         assertNotNull(writer);
+    }
+
+    public void testSendNetworkEvent() throws IOException
+    {
+        final NetworkEvent event = new NetworkEvent(m_clientChannel);
+        event.setBuffer( BufferPool.getBuffer() );
+        event.getBuffer().put( "test".getBytes() ).flip();
+        m_writer.handleEvent( event );
+
+        final ByteBuffer buffer = ByteBuffer.allocate( 20 );
+        m_channel.read( buffer );
+        buffer.flip();
+
+        assertEquals("test", buffer.asCharBuffer().toString());
+    }
+
+    public void testSendIncorrectEvent()
+    {
+        m_writer.handleEvent( "test" );
+        assertEquals("test", m_writer.getDefaultPipe().dequeue() );
     }
 }