You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2004/09/16 09:02:30 UTC

svn commit: rev 46164 - in incubator/directory/seda/trunk: . src/examples/org/apache/seda/examples src/java/org/apache/seda src/java/org/apache/seda/decoder src/java/org/apache/seda/encoder src/java/org/apache/seda/listener src/java/org/apache/seda/protocol src/test/org/apache/seda

Author: akarasulu
Date: Thu Sep 16 00:02:27 2004
New Revision: 46164

Added:
   incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java   (contents, props changed)
Modified:
   incubator/directory/seda/trunk/   (props changed)
   incubator/directory/seda/trunk/project.xml
   incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java
   incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java
Log:
Commit changes ...

 o added new Echo ProtocolHandler
 o added test case for handler
 o started using commons-net for testing server
 o fixed a race condition between ConnectEvent and first InputEvent processing
 o added missing router subscriptions on EncoderManager
 o added missing code to resolve a protocol and request handler in ReqProc



Modified: incubator/directory/seda/trunk/project.xml
==============================================================================
--- incubator/directory/seda/trunk/project.xml	(original)
+++ incubator/directory/seda/trunk/project.xml	Thu Sep 16 00:02:27 2004
@@ -125,6 +125,13 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-net</groupId>
+      <artifactId>commons-net</artifactId>
+      <version>1.2.1</version>
+      <url>http://jakarta.apache.org/commons/net</url>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>3.8.1</version>

Added: incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java	Thu Sep 16 00:02:27 2004
@@ -0,0 +1,173 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.seda.examples;
+
+
+import org.apache.seda.protocol.RequestHandler;
+import org.apache.seda.protocol.HandlerTypeEnum;
+import org.apache.seda.protocol.ProtocolProvider;
+import org.apache.seda.protocol.SingleReplyHandler;
+
+import org.apache.commons.codec.stateful.*;
+
+
+/**
+ * An echo protocol service provider for the SEDA framework.  The provider along
+ * with the SEDA framework implements an RFC 862 compliant server.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class EchoProtocolProvider implements ProtocolProvider
+{
+    /** the authoritative service name of this internet protocol: 'echo' */
+    public static final String NAME = "echo";
+    /** a null decoder that triggers a callback returning the substrate as is */
+    public static final StatefulDecoder NULL_DECODER =
+        new AbstractStatefulDecoder()
+        {
+            /**
+             * Simply returns back the encoded data as is.  The codec
+             * is an identity operation.  Each decode call triggers a
+             * callback since we're presuming each chunk to be a valid
+             * PDU while streaming the request back.
+             *
+             * @param encoded the object to return which is a buffer
+             */
+            public void decode( Object encoded )
+            {
+                super.decodeOccurred( encoded );
+            }
+        };
+    /** a null encoder that triggers a callback returning the substrate as is */
+    public static final StatefulEncoder NULL_ENCODER =
+        new AbstractStatefulEncoder()
+        {
+            /**
+             * Simply returns back the decoded data as is.  The codec
+             * is an identity operation.  Each encode call triggers a
+             * callback since we're presuming each chunk to be a valid
+             * PDU while streaming the request back as the response.
+             *
+             * @param substrate the object to return which is a buffer
+             */
+            public void encode( Object substrate )
+            {
+                super.encodeOccurred( substrate );
+            }
+        };
+    public static final SingleReplyHandler NULL_HANDLER =
+        new SingleReplyHandler()
+    {
+        /**
+         * Returns the request (ByteBuffer) back without any changes to
+         * be echo'd out back to the client.
+         *
+         * @param request the buffer of data to be echo'd.
+         * @return the response to the request but resp = req here.
+         */
+        public Object handle( Object request )
+        {
+            return request;
+        }
+
+        /**
+         * Gets the handler type.
+         *
+         * @return a HandlerTypeEnum constant.
+         */
+        public HandlerTypeEnum getHandlerType()
+        {
+            return HandlerTypeEnum.SINGLEREPLY;
+        }
+    };
+        
+    /**
+     * Gets the authoritative name for the service of this provider.
+     *
+     * @return the authoritative service name
+     */
+    public final String getName()
+    {
+        return NAME;
+    }
+
+    /**
+     * Gets a factory used to create a new StatefulDecoder for this service's
+     * protocol.
+     *
+     * @return a new StatefulDecoder for this service's protocol
+     */
+    public final DecoderFactory getDecoderFactory()
+    {
+        return new DecoderFactory()
+        {
+            /**
+             * Creates a really simple factory for decoders that return back
+             * thir request in the same decode call; 1:1 decode to callback.
+             * The same decoder will be returned evertime since no state is
+             * ever stored by these decoders.
+             *
+             * @return a chunking state based decoder
+             */
+            public StatefulDecoder createDecoder()
+            {
+                return NULL_DECODER;
+            }
+        };
+    }
+
+    /**
+     * Gets a factory used to create a new StatefulEncoder for this service's
+     * protocol.
+     *
+     * @return a new StatefulEncoder for this service's protocol
+     */
+    public EncoderFactory getEncoderFactory()
+    {
+        return new EncoderFactory()
+        {
+            /**
+             * Creates a factory that always returns the same encoder which
+             * never really maintains any state.  This encoder simply returns
+             * the object to encode as is without affecting it in any way.
+             *
+             * @return a new chunking state based encoder
+             */
+            public StatefulEncoder createEncoder()
+            {
+                return NULL_ENCODER;
+            }
+        };
+    }
+
+    /**
+     * The echo protocol provider chunks portions of a stream presenting each
+     * chunk as a request in itself.  The PDU is an available chunk of data to
+     * be echo's back in this way.  This approach however brings about an issue
+     * when synchronizing responses to a request.  We will need a machanism to
+     * detect this from the provider no doubt as well as a means to synchronize
+     * response returns based on their request's sequence id for the client.
+     *
+     * @param request the ByteBuffer containing the data to echo
+     * @return the same ByteBuffer without any changes
+     */
+    public RequestHandler getHandler( Object request )
+    {
+        return NULL_HANDLER;
+    }
+}

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java	Thu Sep 16 00:02:27 2004
@@ -68,7 +68,7 @@
         OutputManager outMan = createOutputManager( router );
         DecoderManager decMan = createDecoderManager( router, inetDb );
         EncoderManager encMan = createEncoderManager( router, inetDb );
-        RequestProcessor reqProc = createRequestProcessor( router );
+        RequestProcessor reqProc = createRequestProcessor( router, inetDb );
 
         DefaultFrontend fe = new DefaultFrontend( bp, decMan, encMan, router,
             inMan, listMan, outMan, reqProc, inetDb );
@@ -101,7 +101,10 @@
     // no deps
     private InetServicesDatabase createServicesDatabase()
     {
-        InetServiceEntry[] entries = { new InetServiceEntry( "ldap", 389 ) };
+        InetServiceEntry[] entries = {
+            new InetServiceEntry( "echo", 7 ),
+            new InetServiceEntry( "ldap", 389 )
+        };
 
         // @todo add a monitor interface for this service and logging impl
         InetServicesDatabase inetDb =
@@ -185,12 +188,13 @@
     }
 
 
-    private RequestProcessor createRequestProcessor( EventRouter router )
+    private RequestProcessor createRequestProcessor( EventRouter router,
+                                                     InetServicesDatabase inetDb )
     {
         DefaultStageConfig config = new DefaultStageConfig( "requestProcessor",
             createThreadPool( 3 ) );
         DefaultRequestProcessor reqProc =
-            new DefaultRequestProcessor( router, config );
+            new DefaultRequestProcessor( router, config, inetDb );
         reqProc.start();
         return reqProc;
     }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java	Thu Sep 16 00:02:27 2004
@@ -210,7 +210,19 @@
                 router.publish( event );
             }
         });
-        decoders.put( key, decoder );
+
+        /*
+         * For potential race conditions between ConnectEvent processing and
+         * the processing of the first InputEvent we synchronize on the decoders
+         * and notify all when we have altered it.  The thread that is waiting
+         * for a client decoder in the decoders map will wait on the map until
+         * awoken.
+         */
+        synchronized( decoders )
+        {
+            decoders.put( key, decoder );
+            decoders.notifyAll();
+        }
     }
 
 
@@ -307,6 +319,32 @@
      */
     StatefulDecoder getDecoder( ClientKey key )
     {
-        return ( StatefulDecoder ) decoders.get( key );
+        StatefulDecoder decoder = null;
+
+        /*
+         * We synchronize on the decoders map so we can wait for notification
+         * on it if it does not contain the client decoder.  This is in case
+         * the processing of the connect event is slow and the client decoder
+         * has not been created yet.  When processing of the ConnectEvent is
+         * complete we are awoken via a notifyAll in the inform() method.
+         */
+        synchronized( decoders )
+        {
+            decoder = ( StatefulDecoder ) decoders.get( key );
+
+            if ( decoder == null )
+            {
+                try
+                {
+                    decoders.wait();
+                }
+                catch ( InterruptedException e )
+                {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        return decoder;
     }
 }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java	Thu Sep 16 00:02:27 2004
@@ -76,6 +76,7 @@
         monitor = new EncoderManagerMonitorAdapter();
         this.inetdb = inetdb;
         this.router = router;
+        this.router.subscribe( ConnectEvent.class, this );
         this.router.subscribe( ProtocolEvent.class, this );
         this.router.subscribe( ResponseEvent.class, this );
     }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java	Thu Sep 16 00:02:27 2004
@@ -17,10 +17,7 @@
 package org.apache.seda.listener;
 
 
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.EventObject;
+import java.util.*;
 
 import java.io.IOException;
 
@@ -33,10 +30,7 @@
 import java.nio.channels.SocketChannel;
 import java.nio.channels.ServerSocketChannel;
 
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.DisconnectEvent;
-import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.*;
 
 
 /**
@@ -49,6 +43,7 @@
 public class DefaultListenerManager 
     implements
     DisconnectSubscriber,
+    ProtocolSubscriber,
     ListenerManager,
     Runnable
 {
@@ -56,6 +51,8 @@
     private final EventRouter router;
     /** selector used to select a acceptable socket channel */
     private final Selector selector;
+    /** a map of auth service names to their protocol providers */
+    private final Map protocols;
     /** the client keys for accepted connections */
     private final Set clients;
     /** the set of listeners managed */
@@ -84,6 +81,7 @@
         this.router = router;
         this.clients = new HashSet();
         this.selector = Selector.open();
+        this.protocols = new HashMap();
         this.listeners = new HashSet();
         this.hasStarted = new Boolean( false );
         this.bindListeners = new HashSet();
@@ -266,7 +264,17 @@
         inform( ( DisconnectEvent ) event );
     }
     
-    
+
+    /**
+     * Informs this subscriber of a protocol event.
+     *
+     * @param event the protocol event to inform of
+     */
+    public void inform( ProtocolEvent event )
+    {
+    }
+
+
     // ------------------------------------------------------------------------
     // Runnable implementation and start/stop controls
     // ------------------------------------------------------------------------

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java	Thu Sep 16 00:02:27 2004
@@ -19,14 +19,13 @@
 
 import java.util.Iterator;
 import java.util.EventObject;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.RequestEvent;
-import org.apache.seda.event.ResponseEvent;
-import org.apache.seda.event.RequestSubscriber;
-import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.*;
 
 import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
 import org.apache.seda.stage.*;
 
 
@@ -37,28 +36,35 @@
  * @version $Rev$
  */
 public class DefaultRequestProcessor extends DefaultStage
-    implements RequestProcessor, RequestSubscriber
+    implements RequestProcessor, RequestSubscriber, ProtocolSubscriber
 {
+    private final Map protocols;
     private final EventRouter router;
+    private final InetServicesDatabase inetDb;
+
     private RequestProcessorMonitor monitor = null;
-    
-    
+
+
     /**
      * Creates a default RequestProcessor.
      * 
      * @param router the event router we subscribe and publish to
      * @param config the configuration for this stage 
      */
-    public DefaultRequestProcessor( EventRouter router, StageConfig config )
+    public DefaultRequestProcessor( EventRouter router, StageConfig config,
+                                    InetServicesDatabase inetDb )
     {
         super( config );
         
         DefaultStageConfig defaultConfig = ( DefaultStageConfig ) config;
         defaultConfig.setHandler( new ProcessorStageHandler() );
         super.setMonitor( new LoggingStageMonitor( getClass() ) );
-        
+
+        this.inetDb = inetDb;
         this.router = router;
         this.router.subscribe( RequestEvent.class, this );
+        this.router.subscribe( ProtocolEvent.class, this );
+        this.protocols = new HashMap( 3 );
         this.monitor = new RequestProcessorMonitorAdapter();
     }
     
@@ -88,7 +94,19 @@
         }
     }
 
-    
+
+    /**
+     * Informs this subscriber of a protocol event.
+     *
+     * @param event the protocol event to inform of
+     */
+    public void inform( ProtocolEvent event )
+    {
+        ProtocolProvider proto = event.getProtocolProvider();
+        protocols.put( proto.getName(), proto );
+    }
+
+
     class ProcessorStageHandler implements StageHandler
     {
         /**
@@ -129,13 +147,20 @@
 
     private RequestHandler getProtocolHandler( ClientKey key, Object request )
     {
-        if ( key == null || request == null )
+        String name = null;
+
+        try
+        {
+            name = inetDb.getProtoByPort( key.getSocket().getLocalPort() );
+        }
+        catch ( KeyExpiryException e )
         {
-            throw new NullPointerException(
-                    "both key and request must not be null" );
+            monitor.keyExpired( key, request, e );
+            throw new IllegalStateException( "key expired can't service req" );
         }
 
-        throw new UnsupportedOperationException( "need mech to acquire handler" );
+        ProtocolProvider proto = ( ProtocolProvider ) protocols.get( name );
+        return proto.getHandler( request );
     }
 
 

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java	Thu Sep 16 00:02:27 2004
@@ -21,6 +21,7 @@
 
 import org.apache.seda.event.Subscriber;
 import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
 
 
 /**
@@ -50,4 +51,6 @@
      * @param t the failure that occurred while processing the request
      */
     void failedOnSingleReply( ClientKey key, Object request, Throwable t );
+
+    void keyExpired( ClientKey key, Object request, KeyExpiryException e );
 }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java	Thu Sep 16 00:02:27 2004
@@ -21,6 +21,7 @@
 
 import org.apache.seda.event.Subscriber;
 import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
 
 
 /**
@@ -57,6 +58,15 @@
         if ( t != null )
         {
             t.printStackTrace();
+        }
+    }
+
+
+    public void keyExpired( ClientKey key, Object req, KeyExpiryException e )
+    {
+        if ( e != null )
+        {
+            e.printStackTrace();
         }
     }
 }

Modified: incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java
==============================================================================
--- incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java	(original)
+++ incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java	Thu Sep 16 00:02:27 2004
@@ -21,6 +21,8 @@
 import org.apache.seda.listener.ListenerConfig;
 import org.apache.seda.listener.DefaultListenerConfig;
 import org.apache.seda.protocol.InetServiceEntry;
+import org.apache.seda.examples.EchoProtocolProvider;
+import org.apache.commons.net.EchoTCPClient;
 
 import java.io.IOException;
 
@@ -90,5 +92,24 @@
             new InetServiceEntry( "ldap", 10389 ) );
         fe.getListenerManager().bind( config );
         fe.getListenerManager().unbind( config );
+    }
+
+
+    public void testEcho() throws IOException, InterruptedException
+    {
+        ListenerConfig config = null;
+        config = new DefaultListenerConfig( new byte[] { 127,0,0,1}, 20, false,
+            new InetServiceEntry( "echo", 7 ) );
+        fe.getListenerManager().bind( config );
+        fe.register( new EchoProtocolProvider() );
+
+        EchoTCPClient client = new EchoTCPClient();
+        client.connect( "localhost", 7 );
+        byte[] toSend = "Hello world!".getBytes();
+        byte[] recieved = new byte[toSend.length];
+        client.getOutputStream().write( toSend );
+        client.getInputStream().read( recieved );
+        client.disconnect();
+        assertEquals( new String( toSend ), new String( recieved ) );
     }
 }