You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2004/09/16 09:02:30 UTC
svn commit: rev 46164 - in incubator/directory/seda/trunk: . src/examples/org/apache/seda/examples src/java/org/apache/seda src/java/org/apache/seda/decoder src/java/org/apache/seda/encoder src/java/org/apache/seda/listener src/java/org/apache/seda/protocol src/test/org/apache/seda
Author: akarasulu
Date: Thu Sep 16 00:02:27 2004
New Revision: 46164
Added:
incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java (contents, props changed)
Modified:
incubator/directory/seda/trunk/ (props changed)
incubator/directory/seda/trunk/project.xml
incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java
incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java
Log:
Commit changes ...
o added new Echo ProtocolHandler
o added test case for handler
o started using commons-net for testing server
o fixed a race condition between ConnectEvent and first InputEvent processing
o added missing router subscriptions on EncoderManager
o added missing code to resolve a protocol and request handler in ReqProc
Modified: incubator/directory/seda/trunk/project.xml
==============================================================================
--- incubator/directory/seda/trunk/project.xml (original)
+++ incubator/directory/seda/trunk/project.xml Thu Sep 16 00:02:27 2004
@@ -125,6 +125,13 @@
</dependency>
<dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>1.2.1</version>
+ <url>http://jakarta.apache.org/commons/net</url>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
Added: incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/examples/org/apache/seda/examples/EchoProtocolProvider.java Thu Sep 16 00:02:27 2004
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.seda.examples;
+
+
+import org.apache.seda.protocol.RequestHandler;
+import org.apache.seda.protocol.HandlerTypeEnum;
+import org.apache.seda.protocol.ProtocolProvider;
+import org.apache.seda.protocol.SingleReplyHandler;
+
+import org.apache.commons.codec.stateful.*;
+
+
+/**
+ * An echo protocol service provider for the SEDA framework. The provider along
+ * with the SEDA framework implements an RFC 862 compliant server.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class EchoProtocolProvider implements ProtocolProvider
+{
+ /** the authoritative service name of this internet protocol: 'echo' */
+ public static final String NAME = "echo";
+ /** a null decoder that triggers a callback returning the substrate as is */
+ public static final StatefulDecoder NULL_DECODER =
+ new AbstractStatefulDecoder()
+ {
+ /**
+ * Simply returns back the encoded data as is. The codec
+ * is an identity operation. Each decode call triggers a
+ * callback since we're presuming each chunk to be a valid
+ * PDU while streaming the request back.
+ *
+ * @param encoded the object to return which is a buffer
+ */
+ public void decode( Object encoded )
+ {
+ super.decodeOccurred( encoded );
+ }
+ };
+ /** a null encoder that triggers a callback returning the substrate as is */
+ public static final StatefulEncoder NULL_ENCODER =
+ new AbstractStatefulEncoder()
+ {
+ /**
+ * Simply returns back the decoded data as is. The codec
+ * is an identity operation. Each encode call triggers a
+ * callback since we're presuming each chunk to be a valid
+ * PDU while streaming the request back as the response.
+ *
+ * @param substrate the object to return which is a buffer
+ */
+ public void encode( Object substrate )
+ {
+ super.encodeOccurred( substrate );
+ }
+ };
+ public static final SingleReplyHandler NULL_HANDLER =
+ new SingleReplyHandler()
+ {
+ /**
+ * Returns the request (ByteBuffer) back without any changes to
+ * be echo'd out back to the client.
+ *
+ * @param request the buffer of data to be echo'd.
+ * @return the response to the request but resp = req here.
+ */
+ public Object handle( Object request )
+ {
+ return request;
+ }
+
+ /**
+ * Gets the handler type.
+ *
+ * @return a HandlerTypeEnum constant.
+ */
+ public HandlerTypeEnum getHandlerType()
+ {
+ return HandlerTypeEnum.SINGLEREPLY;
+ }
+ };
+
+ /**
+ * Gets the authoritative name for the service of this provider.
+ *
+ * @return the authoritative service name
+ */
+ public final String getName()
+ {
+ return NAME;
+ }
+
+ /**
+ * Gets a factory used to create a new StatefulDecoder for this service's
+ * protocol.
+ *
+ * @return a new StatefulDecoder for this service's protocol
+ */
+ public final DecoderFactory getDecoderFactory()
+ {
+ return new DecoderFactory()
+ {
+ /**
+ * Creates a really simple factory for decoders that return back
+ * thir request in the same decode call; 1:1 decode to callback.
+ * The same decoder will be returned evertime since no state is
+ * ever stored by these decoders.
+ *
+ * @return a chunking state based decoder
+ */
+ public StatefulDecoder createDecoder()
+ {
+ return NULL_DECODER;
+ }
+ };
+ }
+
+ /**
+ * Gets a factory used to create a new StatefulEncoder for this service's
+ * protocol.
+ *
+ * @return a new StatefulEncoder for this service's protocol
+ */
+ public EncoderFactory getEncoderFactory()
+ {
+ return new EncoderFactory()
+ {
+ /**
+ * Creates a factory that always returns the same encoder which
+ * never really maintains any state. This encoder simply returns
+ * the object to encode as is without affecting it in any way.
+ *
+ * @return a new chunking state based encoder
+ */
+ public StatefulEncoder createEncoder()
+ {
+ return NULL_ENCODER;
+ }
+ };
+ }
+
+ /**
+ * The echo protocol provider chunks portions of a stream presenting each
+ * chunk as a request in itself. The PDU is an available chunk of data to
+ * be echo's back in this way. This approach however brings about an issue
+ * when synchronizing responses to a request. We will need a machanism to
+ * detect this from the provider no doubt as well as a means to synchronize
+ * response returns based on their request's sequence id for the client.
+ *
+ * @param request the ByteBuffer containing the data to echo
+ * @return the same ByteBuffer without any changes
+ */
+ public RequestHandler getHandler( Object request )
+ {
+ return NULL_HANDLER;
+ }
+}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java Thu Sep 16 00:02:27 2004
@@ -68,7 +68,7 @@
OutputManager outMan = createOutputManager( router );
DecoderManager decMan = createDecoderManager( router, inetDb );
EncoderManager encMan = createEncoderManager( router, inetDb );
- RequestProcessor reqProc = createRequestProcessor( router );
+ RequestProcessor reqProc = createRequestProcessor( router, inetDb );
DefaultFrontend fe = new DefaultFrontend( bp, decMan, encMan, router,
inMan, listMan, outMan, reqProc, inetDb );
@@ -101,7 +101,10 @@
// no deps
private InetServicesDatabase createServicesDatabase()
{
- InetServiceEntry[] entries = { new InetServiceEntry( "ldap", 389 ) };
+ InetServiceEntry[] entries = {
+ new InetServiceEntry( "echo", 7 ),
+ new InetServiceEntry( "ldap", 389 )
+ };
// @todo add a monitor interface for this service and logging impl
InetServicesDatabase inetDb =
@@ -185,12 +188,13 @@
}
- private RequestProcessor createRequestProcessor( EventRouter router )
+ private RequestProcessor createRequestProcessor( EventRouter router,
+ InetServicesDatabase inetDb )
{
DefaultStageConfig config = new DefaultStageConfig( "requestProcessor",
createThreadPool( 3 ) );
DefaultRequestProcessor reqProc =
- new DefaultRequestProcessor( router, config );
+ new DefaultRequestProcessor( router, config, inetDb );
reqProc.start();
return reqProc;
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java Thu Sep 16 00:02:27 2004
@@ -210,7 +210,19 @@
router.publish( event );
}
});
- decoders.put( key, decoder );
+
+ /*
+ * For potential race conditions between ConnectEvent processing and
+ * the processing of the first InputEvent we synchronize on the decoders
+ * and notify all when we have altered it. The thread that is waiting
+ * for a client decoder in the decoders map will wait on the map until
+ * awoken.
+ */
+ synchronized( decoders )
+ {
+ decoders.put( key, decoder );
+ decoders.notifyAll();
+ }
}
@@ -307,6 +319,32 @@
*/
StatefulDecoder getDecoder( ClientKey key )
{
- return ( StatefulDecoder ) decoders.get( key );
+ StatefulDecoder decoder = null;
+
+ /*
+ * We synchronize on the decoders map so we can wait for notification
+ * on it if it does not contain the client decoder. This is in case
+ * the processing of the connect event is slow and the client decoder
+ * has not been created yet. When processing of the ConnectEvent is
+ * complete we are awoken via a notifyAll in the inform() method.
+ */
+ synchronized( decoders )
+ {
+ decoder = ( StatefulDecoder ) decoders.get( key );
+
+ if ( decoder == null )
+ {
+ try
+ {
+ decoders.wait();
+ }
+ catch ( InterruptedException e )
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return decoder;
}
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java Thu Sep 16 00:02:27 2004
@@ -76,6 +76,7 @@
monitor = new EncoderManagerMonitorAdapter();
this.inetdb = inetdb;
this.router = router;
+ this.router.subscribe( ConnectEvent.class, this );
this.router.subscribe( ProtocolEvent.class, this );
this.router.subscribe( ResponseEvent.class, this );
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/DefaultListenerManager.java Thu Sep 16 00:02:27 2004
@@ -17,10 +17,7 @@
package org.apache.seda.listener;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.EventObject;
+import java.util.*;
import java.io.IOException;
@@ -33,10 +30,7 @@
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.DisconnectEvent;
-import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.*;
/**
@@ -49,6 +43,7 @@
public class DefaultListenerManager
implements
DisconnectSubscriber,
+ ProtocolSubscriber,
ListenerManager,
Runnable
{
@@ -56,6 +51,8 @@
private final EventRouter router;
/** selector used to select a acceptable socket channel */
private final Selector selector;
+ /** a map of auth service names to their protocol providers */
+ private final Map protocols;
/** the client keys for accepted connections */
private final Set clients;
/** the set of listeners managed */
@@ -84,6 +81,7 @@
this.router = router;
this.clients = new HashSet();
this.selector = Selector.open();
+ this.protocols = new HashMap();
this.listeners = new HashSet();
this.hasStarted = new Boolean( false );
this.bindListeners = new HashSet();
@@ -266,7 +264,17 @@
inform( ( DisconnectEvent ) event );
}
-
+
+ /**
+ * Informs this subscriber of a protocol event.
+ *
+ * @param event the protocol event to inform of
+ */
+ public void inform( ProtocolEvent event )
+ {
+ }
+
+
// ------------------------------------------------------------------------
// Runnable implementation and start/stop controls
// ------------------------------------------------------------------------
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java Thu Sep 16 00:02:27 2004
@@ -19,14 +19,13 @@
import java.util.Iterator;
import java.util.EventObject;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.RequestEvent;
-import org.apache.seda.event.ResponseEvent;
-import org.apache.seda.event.RequestSubscriber;
-import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.*;
import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
import org.apache.seda.stage.*;
@@ -37,28 +36,35 @@
* @version $Rev$
*/
public class DefaultRequestProcessor extends DefaultStage
- implements RequestProcessor, RequestSubscriber
+ implements RequestProcessor, RequestSubscriber, ProtocolSubscriber
{
+ private final Map protocols;
private final EventRouter router;
+ private final InetServicesDatabase inetDb;
+
private RequestProcessorMonitor monitor = null;
-
-
+
+
/**
* Creates a default RequestProcessor.
*
* @param router the event router we subscribe and publish to
* @param config the configuration for this stage
*/
- public DefaultRequestProcessor( EventRouter router, StageConfig config )
+ public DefaultRequestProcessor( EventRouter router, StageConfig config,
+ InetServicesDatabase inetDb )
{
super( config );
DefaultStageConfig defaultConfig = ( DefaultStageConfig ) config;
defaultConfig.setHandler( new ProcessorStageHandler() );
super.setMonitor( new LoggingStageMonitor( getClass() ) );
-
+
+ this.inetDb = inetDb;
this.router = router;
this.router.subscribe( RequestEvent.class, this );
+ this.router.subscribe( ProtocolEvent.class, this );
+ this.protocols = new HashMap( 3 );
this.monitor = new RequestProcessorMonitorAdapter();
}
@@ -88,7 +94,19 @@
}
}
-
+
+ /**
+ * Informs this subscriber of a protocol event.
+ *
+ * @param event the protocol event to inform of
+ */
+ public void inform( ProtocolEvent event )
+ {
+ ProtocolProvider proto = event.getProtocolProvider();
+ protocols.put( proto.getName(), proto );
+ }
+
+
class ProcessorStageHandler implements StageHandler
{
/**
@@ -129,13 +147,20 @@
private RequestHandler getProtocolHandler( ClientKey key, Object request )
{
- if ( key == null || request == null )
+ String name = null;
+
+ try
+ {
+ name = inetDb.getProtoByPort( key.getSocket().getLocalPort() );
+ }
+ catch ( KeyExpiryException e )
{
- throw new NullPointerException(
- "both key and request must not be null" );
+ monitor.keyExpired( key, request, e );
+ throw new IllegalStateException( "key expired can't service req" );
}
- throw new UnsupportedOperationException( "need mech to acquire handler" );
+ ProtocolProvider proto = ( ProtocolProvider ) protocols.get( name );
+ return proto.getHandler( request );
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitor.java Thu Sep 16 00:02:27 2004
@@ -21,6 +21,7 @@
import org.apache.seda.event.Subscriber;
import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
/**
@@ -50,4 +51,6 @@
* @param t the failure that occurred while processing the request
*/
void failedOnSingleReply( ClientKey key, Object request, Throwable t );
+
+ void keyExpired( ClientKey key, Object request, KeyExpiryException e );
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/RequestProcessorMonitorAdapter.java Thu Sep 16 00:02:27 2004
@@ -21,6 +21,7 @@
import org.apache.seda.event.Subscriber;
import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
/**
@@ -57,6 +58,15 @@
if ( t != null )
{
t.printStackTrace();
+ }
+ }
+
+
+ public void keyExpired( ClientKey key, Object req, KeyExpiryException e )
+ {
+ if ( e != null )
+ {
+ e.printStackTrace();
}
}
}
Modified: incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java
==============================================================================
--- incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java (original)
+++ incubator/directory/seda/trunk/src/test/org/apache/seda/DefaultFrontendFactoryTest.java Thu Sep 16 00:02:27 2004
@@ -21,6 +21,8 @@
import org.apache.seda.listener.ListenerConfig;
import org.apache.seda.listener.DefaultListenerConfig;
import org.apache.seda.protocol.InetServiceEntry;
+import org.apache.seda.examples.EchoProtocolProvider;
+import org.apache.commons.net.EchoTCPClient;
import java.io.IOException;
@@ -90,5 +92,24 @@
new InetServiceEntry( "ldap", 10389 ) );
fe.getListenerManager().bind( config );
fe.getListenerManager().unbind( config );
+ }
+
+
+ public void testEcho() throws IOException, InterruptedException
+ {
+ ListenerConfig config = null;
+ config = new DefaultListenerConfig( new byte[] { 127,0,0,1}, 20, false,
+ new InetServiceEntry( "echo", 7 ) );
+ fe.getListenerManager().bind( config );
+ fe.register( new EchoProtocolProvider() );
+
+ EchoTCPClient client = new EchoTCPClient();
+ client.connect( "localhost", 7 );
+ byte[] toSend = "Hello world!".getBytes();
+ byte[] recieved = new byte[toSend.length];
+ client.getOutputStream().write( toSend );
+ client.getInputStream().read( recieved );
+ client.disconnect();
+ assertEquals( new String( toSend ), new String( recieved ) );
}
}