You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2009/07/12 18:03:17 UTC

svn commit: r793367 - in /directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api: LdapConnection.java messages/future/ResponseFuture.java

Author: kayyagari
Date: Sun Jul 12 16:03:16 2009
New Revision: 793367

URL: http://svn.apache.org/viewvc?rev=793367&view=rev
Log:
o added support for abandon request
o created a common Future class to use for all operations(as part of this removed reference to BindFuture)

Added:
    directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java
Modified:
    directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java

Modified: directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java
URL: http://svn.apache.org/viewvc/directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java?rev=793367&r1=793366&r2=793367&view=diff
==============================================================================
--- directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java (original)
+++ directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java Sun Jul 12 16:03:16 2009
@@ -71,7 +71,7 @@
 import org.apache.directory.shared.ldap.client.api.messages.SearchResultDone;
 import org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry;
 import org.apache.directory.shared.ldap.client.api.messages.SearchResultReference;
-import org.apache.directory.shared.ldap.client.api.messages.future.BindFuture;
+import org.apache.directory.shared.ldap.client.api.messages.future.ResponseFuture;
 import org.apache.directory.shared.ldap.client.api.protocol.LdapProtocolCodecFactory;
 import org.apache.directory.shared.ldap.codec.ControlCodec;
 import org.apache.directory.shared.ldap.codec.LdapConstants;
@@ -207,6 +207,9 @@
     /** a map to hold the response listeners based on the operation id */
     private Map<Integer, OperationResponseListener> listenerMap = new ConcurrentHashMap<Integer, OperationResponseListener>();
     
+    /** a map to hold the ResponseFutures for all operations */
+    private Map<Integer, ResponseFuture> futureMap = new ConcurrentHashMap<Integer, ResponseFuture>();
+    
     /** list of controls supported by the server */
     private List<String> supportedControls;
 
@@ -709,6 +712,9 @@
         
         message.setProtocolOP( addReqCodec );
         
+        ResponseFuture addFuture = new ResponseFuture( addResponseQueue );
+        futureMap.put( newId, addFuture );
+
         // Send the request to the server
         ldapSession.write( message );
         
@@ -718,7 +724,7 @@
             try
             {
                 long timeout = getTimeout( addRequest.getTimeout() );
-                response = addResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+                response = ( AddResponse ) addFuture.get( timeout, TimeUnit.MILLISECONDS );
                 
                 if ( response == null )
                 {
@@ -727,9 +733,15 @@
                     throw new LdapException( TIME_OUT_ERROR );
                 }
             }
+            catch( InterruptedException ie )
+            {
+                LOG.error( "Operation would have been cancelled", ie );
+                throw new LdapException( ie );
+            }
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
+                futureMap.remove( newId );
                 unlockSession();
                 throw new LdapException( e );
             }
@@ -818,6 +830,28 @@
 
         // Send the request to the server
         ldapSession.write( message );
+
+        // remove the associated listener if any 
+        int abandonId = abandonRequest.getAbandonedMessageId();
+
+        ResponseFuture rf = futureMap.remove( abandonId );
+        if( rf != null )
+        {
+            LOG.debug( "sending cancel signal to future" );
+            rf.cancel( true );
+        }
+        else
+        {
+            LOG.error( "There is no future asscoiated with operation message ID {}, perhaps the operation would have been completed", abandonId );
+        }
+        
+        OperationResponseListener listener = listenerMap.remove( abandonId );
+
+        if( listener != null )
+        {
+            LOG.error( "removed the listener associated with the abandoned operation with id {}", abandonId );
+        }
+
     }
     
     
@@ -914,7 +948,7 @@
      * @param bindRequest The BindRequest to send
      * @param listener The listener 
      */
-    public BindFuture bind( BindRequest bindRequest, BindListener bindListener ) throws LdapException 
+    public ResponseFuture bind( BindRequest bindRequest, BindListener bindListener ) throws LdapException 
     {
         return bindAsyncInternal( bindRequest, bindListener );
     }
@@ -929,7 +963,7 @@
     private BindResponse bindInternal( BindRequest bindRequest ) throws LdapException 
     {
         // Create the future to get the result
-        BindFuture bindFuture = bindAsyncInternal( bindRequest, null );
+        ResponseFuture bindFuture = bindAsyncInternal( bindRequest, null );
         
         // And get the result
         try
@@ -938,7 +972,7 @@
             long timeout = getTimeout( bindRequest.getTimeout() );
             
             // Get the response, blocking
-            BindResponse bindResponse = bindFuture.get( timeout, TimeUnit.MILLISECONDS );
+            BindResponse bindResponse = ( BindResponse ) bindFuture.get( timeout, TimeUnit.MILLISECONDS );
 
             // Release the session lock
             unlockSession();
@@ -951,7 +985,10 @@
         catch ( TimeoutException te )
         {
             // Send an abandon request
-            abandon( bindRequest.getMessageId() );
+            if( !bindFuture.isCancelled() )
+            {
+                abandon( bindRequest.getMessageId() );
+            }
             
             // We didn't received anything : this is an error
             LOG.error( "Bind failed : timeout occured" );
@@ -967,7 +1004,10 @@
             unlockSession();
             
             // Send an abandon request
-            abandon( bindRequest.getMessageId() );
+            if( !bindFuture.isCancelled() )
+            {
+                abandon( bindRequest.getMessageId() );
+            }
             throw ldapException;
         }
     }
@@ -979,7 +1019,7 @@
      * @param bindRequest The BindRequest to send
      * @param listener The listener (Can be null) 
      */
-    private BindFuture bindAsyncInternal( BindRequest bindRequest, BindListener bindListener ) throws LdapException 
+    private ResponseFuture bindAsyncInternal( BindRequest bindRequest, BindListener bindListener ) throws LdapException 
     {
         // First try to connect, if we aren't already connected.
         connect();
@@ -1055,11 +1095,14 @@
         LOG.debug( "-----------------------------------------------------------------" );
         LOG.debug( "Sending request \n{}", bindMessage );
 
+        ResponseFuture rf = new ResponseFuture( bindResponseQueue );
+        futureMap.put( newId, rf );
+
         // Send the request to the server
         ldapSession.write( bindMessage );
 
         // Return the associated future
-        return new BindFuture( bindResponseQueue );
+        return rf;
     }
     
 
@@ -1225,9 +1268,13 @@
         LOG.debug( "-----------------------------------------------------------------" );
         LOG.debug( "Sending request \n{}", searchMessage );
     
+        ResponseFuture searchFuture = new ResponseFuture( searchResponseQueue );
+        futureMap.put( newId, searchFuture );
+
         // Send the request to the server
         ldapSession.write( searchMessage );
     
+        
         if ( searchListener == null )
         {
             // Read the response, waiting for it if not available immediately
@@ -1240,7 +1287,7 @@
                 // We may have more than one response, so loop on the queue
                 do 
                 {
-                    response = searchResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+                    response = ( SearchResponse ) searchFuture.get( timeout, TimeUnit.MILLISECONDS );
 
                     // Check that we didn't get out because of a timeout
                     if ( response == null )
@@ -1249,7 +1296,7 @@
                         abandon( searchMessage.getSearchRequest().getMessageId() );
                         
                         // We didn't received anything : this is an error
-                        LOG.error( "Bind failed : timeout occured" );
+                        LOG.error( "Search failed : timeout occured" );
                         unlockSession();
                         throw new LdapException( TIME_OUT_ERROR );
                     }
@@ -1274,14 +1321,22 @@
                 
                 return new ListCursor<SearchResponse>( searchResponses );
             }
-            catch ( InterruptedException ie )
+            catch( InterruptedException ie )
+            {
+                LOG.error( "Operation would have been cancelled", ie );
+                throw new LdapException( ie );
+            }
+            catch ( Exception e )
             {
                 LOG.error( "The response queue has been emptied, no response will be find." );
                 LdapException ldapException = new LdapException();
-                ldapException.initCause( ie );
+                ldapException.initCause( e );
                 
                 // Send an abandon request
-                abandon( searchMessage.getBindRequest().getMessageId() );
+                if( !searchFuture.isCancelled() )
+                {
+                    abandon( searchMessage.getBindRequest().getMessageId() );
+                }
                 throw ldapException;
             }
         }
@@ -1375,8 +1430,17 @@
 
         LOG.debug( "-------> {} Message received <-------", response.getMessageTypeName() );
         
-        SearchListener searchListener = null;
+        // this check is necessary to prevent adding an abandoned operation's
+        // result(s) to corresponding queue
+        ResponseFuture rf = futureMap.get( response.getMessageId() );
+        if( rf == null )
+        {
+            LOG.error( "There is no future associated with the messageId {}, ignoring the message", response.getMessageId()  );
+            return;
+        }
         
+        SearchListener searchListener = null;
+
         switch ( response.getMessageType() )
         {
             case LdapConstants.ADD_RESPONSE :
@@ -1385,6 +1449,7 @@
                 addRespCodec.addControl( response.getCurrentControl() );
                 addRespCodec.setMessageId( response.getMessageId() );
                 
+                futureMap.remove( addRespCodec.getMessageId() );
                 AddListener addListener = ( AddListener ) listenerMap.remove( addRespCodec.getMessageId() );
                 AddResponse addResp = convert( addRespCodec );
                 if( addListener != null )
@@ -1403,6 +1468,7 @@
                 bindResponseCodec.addControl( response.getCurrentControl() );
                 BindResponse bindResponse = convert( bindResponseCodec );
 
+                futureMap.remove( bindResponseCodec.getMessageId() );
                 // remove the listener from the listener map
                 BindListener bindListener = ( BindListener ) listenerMap.remove( bindResponseCodec.getMessageId() );
                 if ( bindListener != null )
@@ -1428,8 +1494,9 @@
                 delRespCodec.setMessageId( response.getMessageId() );
                 delRespCodec.addControl( response.getCurrentControl() );
                 DeleteResponse delResp = convert( delRespCodec );
-                DeleteListener delListener = ( DeleteListener ) listenerMap.remove( delResp.getMessageId() );
                 
+                futureMap.remove( delResp.getMessageId() );
+                DeleteListener delListener = ( DeleteListener ) listenerMap.remove( delResp.getMessageId() );
                 if( delListener != null )
                 {
                     delListener.entryDeleted( this, delResp );
@@ -1470,8 +1537,9 @@
                 ModifyResponseCodec modRespCodec = response.getModifyResponse();
                 modRespCodec.setMessageId( response.getMessageId() );
                 modRespCodec.addControl( response.getCurrentControl() );
-                
                 ModifyResponse modResp = convert( modRespCodec );
+
+                futureMap.remove( modResp.getMessageId() );
                 ModifyListener modListener = ( ModifyListener ) listenerMap.remove( modResp.getMessageId() );
                 
                 if( modListener != null )
@@ -1490,6 +1558,8 @@
                 modDnCodec.addControl( response.getCurrentControl() );
                 modDnCodec.setMessageId( response.getMessageId() );
                 ModifyDnResponse modDnResp = convert( modDnCodec );
+                
+                futureMap.remove( modDnCodec.getMessageId() );
                 ModifyDnListener modDnListener = ( ModifyDnListener ) listenerMap.remove( modDnCodec.getMessageId() );
                 if( modDnListener != null )
                 {
@@ -1509,6 +1579,8 @@
                 searchResultDoneCodec.setMessageId( response.getMessageId() );
                 searchResultDoneCodec.addControl( response.getCurrentControl() );
                 SearchResultDone srchDone = convert( searchResultDoneCodec );
+                
+                futureMap.remove( searchResultDoneCodec.getMessageId() );
                 // search listener has to be removed from listener map only here
                 searchListener = ( SearchListener ) listenerMap.remove( searchResultDoneCodec.getMessageId() );
                 if ( searchListener != null )
@@ -1531,6 +1603,7 @@
                 
                 SearchResultEntry srchEntry = convert( searchResultEntryCodec );
                 searchListener = ( SearchListener ) listenerMap.get( searchResultEntryCodec.getMessageId() );
+                
                 if ( searchListener != null )
                 {
                     searchListener.entryFound( this, srchEntry );
@@ -1623,16 +1696,19 @@
 
         modifyMessage.setProtocolOP( modReqCodec );
         setControls( modRequest.getControls(), modifyMessage );
+
+        ResponseFuture modifyFuture = new ResponseFuture( modifyResponseQueue );
+        futureMap.put( newId, modifyFuture );
         
         ldapSession.write( modifyMessage );
-
+        
         ModifyResponse response = null;
         if( listener == null )
         {
             try
             {
                 long timeout = getTimeout( modRequest.getTimeout() );
-                response = modifyResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+                response = ( ModifyResponse ) modifyFuture.get( timeout, TimeUnit.MILLISECONDS );
                 
                 if ( response == null )
                 {
@@ -1641,9 +1717,15 @@
                     throw new LdapException( TIME_OUT_ERROR );
                 }
             }
+            catch( InterruptedException ie )
+            {
+                LOG.error( "Operation would have been cancelled", ie );
+                throw new LdapException( ie );
+            }
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
+                futureMap.remove( newId );
                 unlockSession();
                 throw new LdapException( e );
             }
@@ -1800,6 +1882,9 @@
         modifyDnMessage.setProtocolOP( modDnCodec );
         setControls( modDnRequest.getControls(), modifyDnMessage );
         
+        ResponseFuture modifyDNFuture = new ResponseFuture( modifyDNResponseQueue );
+        futureMap.put( newId, modifyDNFuture );
+        
         ldapSession.write( modifyDnMessage );
         
         if( listener == null )
@@ -1808,7 +1893,7 @@
             try
             {
                 long timeout = getTimeout( modDnRequest.getTimeout() );
-                response = modifyDNResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+                response = ( ModifyDnResponse ) modifyDNFuture.get( timeout, TimeUnit.MILLISECONDS );
                 
                 if ( response == null )
                 {
@@ -1817,9 +1902,15 @@
                     throw new LdapException( TIME_OUT_ERROR );
                 }
             }
+            catch( InterruptedException ie )
+            {
+                LOG.error( "Operation would have been cancelled", ie );
+                throw new LdapException( ie );
+            }
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
+                futureMap.remove( newId );
                 unlockSession();
                 LdapException ldapException = new LdapException();
                 ldapException.initCause( e );
@@ -2034,6 +2125,9 @@
         deleteMessage.setProtocolOP( delCodec );
         setControls( delRequest.getControls(), deleteMessage );
         
+        ResponseFuture deleteFuture = new ResponseFuture( deleteResponseQueue );
+        futureMap.put( newId, deleteFuture );
+
         ldapSession.write( deleteMessage );
         
         DeleteResponse response = null;
@@ -2042,7 +2136,7 @@
             try
             {
                 long timeout = getTimeout( delRequest.getTimeout() );
-                response = deleteResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+                response = ( DeleteResponse ) deleteFuture.get( timeout, TimeUnit.MILLISECONDS );
                 
                 if ( response == null )
                 {
@@ -2051,15 +2145,20 @@
                     throw new LdapException( TIME_OUT_ERROR );
                 }
             }
+            catch( InterruptedException ie )
+            {
+                LOG.error( "Operation would have been cancelled", ie );
+                throw new LdapException( ie );
+            }
             catch( Exception e )
             {
                 LOG.error( NO_RESPONSE_ERROR );
+                futureMap.remove( newId );
                 unlockSession();
                 LdapException ldapException = new LdapException();
                 ldapException.initCause( e );
                 throw ldapException;
             }
-            
         }
         else
         {

Added: directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java
URL: http://svn.apache.org/viewvc/directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java?rev=793367&view=auto
==============================================================================
--- directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java (added)
+++ directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java Sun Jul 12 16:03:16 2009
@@ -0,0 +1,138 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.shared.ldap.client.api.messages.future;
+
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.directory.shared.ldap.client.api.messages.AbstractResponseWithResult;
+import org.apache.directory.shared.ldap.client.api.messages.Response;
+
+
+/**
+ * A Future implementation used in LdapConnection operations.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class ResponseFuture implements Future<Response>
+{
+    /** the blocking queue holding LDAP responses */
+    private final BlockingQueue<Response> responseQueue;
+
+    /** flag to determine if this future is cancelled */
+    private boolean cancelled = false;
+
+    /** an object to indicate a cancelled/abandoned operation */
+    // 'poision pill shutdown' refer p. 155-156 Java Concurrency in Practice - Brian Goetz
+    private static final Response CANCEL_POISION = new AbstractResponseWithResult(){};
+
+
+    /**
+     * Creates a new instance of ResponseFuture.
+     *
+     * @param responseQueue a non-null blocking queue
+     */
+    public ResponseFuture( final BlockingQueue responseQueue )
+    {
+        if ( responseQueue == null )
+        {
+            throw new NullPointerException( "response queue cannot be null" );
+        }
+
+        this.responseQueue = responseQueue;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean cancel( boolean mayInterruptIfRunning )
+    {
+        if( cancelled )
+        {
+            return cancelled;
+        }
+        
+        cancelled = true;
+        responseQueue.add( CANCEL_POISION );
+        
+        return cancelled;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     * @throws InterruptedException if the operation has been cancelled by client
+     */
+    public Response get() throws InterruptedException, ExecutionException
+    {
+        Response resp = responseQueue.poll();
+        
+        if( resp == CANCEL_POISION )
+        {
+            throw new InterruptedException( "cancelled" );
+        }
+        
+        return resp;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     * @throws InterruptedException if the operation has been cancelled by client
+     */
+    public Response get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException,
+        TimeoutException
+    {
+        Response resp = responseQueue.poll( timeout, unit );
+        
+        if( resp == CANCEL_POISION )
+        {
+            throw new InterruptedException( "cancelled" );
+        }
+        
+        return resp;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isCancelled()
+    {
+        return cancelled;
+    }
+
+
+    /**
+     * This operation is not supported in this implementation of Future
+     */
+    public boolean isDone()
+    {
+        throw new UnsupportedOperationException( "Operation not supported" );
+    }
+
+}