You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/10/23 02:37:31 UTC

svn commit: rev 55340 - incubator/directory/seda/trunk/src/java/org/apache/seda/output

Author: trustin
Date: Fri Oct 22 17:37:30 2004
New Revision: 55340

Added:
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java   (contents, props changed)
Log:
Implemented UDP output manager

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java	Fri Oct 22 17:37:30 2004
@@ -0,0 +1,231 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.seda.output;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.ConnectSubscriber;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.OutputEvent;
+import org.apache.seda.event.OutputSubscriber;
+import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.listener.UDPClientKey;
+import org.apache.seda.stage.DefaultStage;
+import org.apache.seda.stage.DefaultStageConfig;
+import org.apache.seda.stage.LoggingStageMonitor;
+import org.apache.seda.stage.StageHandler;
+
+
+/**
+ * The default OutputManager implementation.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class UDPOutputManager extends DefaultStage
+    implements 
+    OutputManager, 
+    OutputSubscriber, 
+    ConnectSubscriber, 
+    DisconnectSubscriber
+{
+    /** the router we subscribe for OutputEvents on */
+    private final EventRouter router;
+    /** the monitor used to track notable events in this OutputManager */
+    private OutputMonitor monitor;
+    /** a map of channels by ClientKey */
+    private Map channels = new HashMap();
+    
+    
+    // ------------------------------------------------------------------------
+    // constructors
+    // ------------------------------------------------------------------------
+    
+    
+    /**
+     * Creates a defualt OutputManager.
+     * 
+     * @param router the router we subscribe for OutputEvents on
+     * @param config the configuration for this Stage
+     */
+    public UDPOutputManager( EventRouter router, DefaultStageConfig config )
+    {
+        super( config );
+        this.router = router;
+        this.router.subscribe( OutputEvent.class, this );
+        this.router.subscribe( ConnectEvent.class, this );
+        this.router.subscribe( DisconnectEvent.class, this );
+        config.setHandler( new OutputStageHandler() );
+        this.setMonitor( new LoggingStageMonitor() );
+        this.setOutputMonitor( new LoggingOutputMonitor() );
+    }
+
+
+    // ------------------------------------------------------------------------
+    // subscriber inform methods 
+    // ------------------------------------------------------------------------
+    
+    
+    /* 
+     * @see org.apache.seda.event.Subscriber#inform(java.util.EventObject)
+     */
+    public void inform( EventObject event )
+    {
+        try
+        {
+            AbstractSubscriber.inform( this, event );
+        }
+        catch( Throwable t )
+        {
+            monitor.failedOnInform( this, event, t );
+        }
+    }
+    
+    
+    /*
+     * @see org.apache.seda.event.OutputSubscriber#inform(
+     * org.apache.seda.event.OutputEvent)
+     */
+    public void inform( OutputEvent event )
+    {
+        enqueue( event );
+    }
+    
+    
+    /* (non-Javadoc)
+     * @see org.apache.seda.event.ConnectSubscriber#inform(
+     * org.apache.seda.event.ConnectEvent)
+     */
+    public void inform( ConnectEvent event )
+    {
+        UDPClientKey key = (UDPClientKey) event.getClientKey();
+        
+        try
+        {
+            channels.put( key, key.getSocket().getChannel() );
+        }
+        catch( KeyExpiryException e )
+        {
+            monitor.keyExpired( this, key, e );
+        }
+        
+        monitor.addedClient( this, event );
+    }
+    
+    
+    /* (non-Javadoc)
+     * @see org.apache.seda.event.DisconnectSubscriber#inform(
+     * org.apache.seda.event.DisconnectEvent)
+     */
+    public void inform( DisconnectEvent event )
+    {
+        channels.remove( event.getClientKey() );
+        monitor.removedClient( this, event );
+    }
+    
+    
+    // ------------------------------------------------------------------------
+    // OutputManager method
+    // ------------------------------------------------------------------------
+    
+    
+    /*
+     *  (non-Javadoc)
+     * @see org.apache.seda.output.OutputManager#write(
+     * org.apache.seda.listener.ClientKey, java.nio.ByteBuffer)
+     */
+    public void write( ClientKey key, ByteBuffer buf )
+        throws IOException
+    {
+        Object lock = null;
+        DatagramChannel channel = ( DatagramChannel ) channels.get( key );
+        
+        if ( null == channel ) 
+        {
+            monitor.channelMissing( this, key );
+            return;
+        }
+
+        // Obtain output lock for write to client.
+        try 
+        {
+            lock = key.getOutputLock();
+        } 
+        catch ( KeyExpiryException e )  
+        {
+            monitor.keyExpired( this, key, e );
+            return;
+        }
+
+        // synchronize on client output stream lock object.
+        synchronized( lock ) 
+        {
+            monitor.writeLockAcquired( this, key ) ;
+            channel.send( buf, key.getRemoteAddress() );
+            lock.notifyAll();
+        }
+        
+        monitor.writeOccurred( this, key );
+    }
+    
+    
+    /**
+     * Sets the output manager's monitor.
+     * 
+     * @param monitor the monitor used by this output manager
+     */
+    public void setOutputMonitor( OutputMonitor monitor )
+    {
+        this.monitor = monitor;
+    }
+
+    
+    /**
+     * EventHandler designed for processing output events.
+     */
+    class OutputStageHandler implements StageHandler
+    {
+        public void handleEvent( EventObject generic )
+        {
+            if ( generic instanceof OutputEvent )
+            {    
+                OutputEvent event = ( OutputEvent ) generic;
+                
+                try
+                {
+                    write( event.getClientKey(), event.getBuffer() );
+                }
+                catch ( IOException e )
+                {
+                    monitor.failedOnWrite( UDPOutputManager.this, 
+                            event.getClientKey(), e );
+                }
+            }
+        }
+    }
+}