You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ka...@apache.org on 2009/07/12 18:03:17 UTC
svn commit: r793367 - in
/directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api:
LdapConnection.java messages/future/ResponseFuture.java
Author: kayyagari
Date: Sun Jul 12 16:03:16 2009
New Revision: 793367
URL: http://svn.apache.org/viewvc?rev=793367&view=rev
Log:
o added support for abandon request
o created a common Future class to use for all operations(as part of this removed reference to BindFuture)
Added:
directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java
Modified:
directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java
Modified: directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java
URL: http://svn.apache.org/viewvc/directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java?rev=793367&r1=793366&r2=793367&view=diff
==============================================================================
--- directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java (original)
+++ directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/LdapConnection.java Sun Jul 12 16:03:16 2009
@@ -71,7 +71,7 @@
import org.apache.directory.shared.ldap.client.api.messages.SearchResultDone;
import org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry;
import org.apache.directory.shared.ldap.client.api.messages.SearchResultReference;
-import org.apache.directory.shared.ldap.client.api.messages.future.BindFuture;
+import org.apache.directory.shared.ldap.client.api.messages.future.ResponseFuture;
import org.apache.directory.shared.ldap.client.api.protocol.LdapProtocolCodecFactory;
import org.apache.directory.shared.ldap.codec.ControlCodec;
import org.apache.directory.shared.ldap.codec.LdapConstants;
@@ -207,6 +207,9 @@
/** a map to hold the response listeners based on the operation id */
private Map<Integer, OperationResponseListener> listenerMap = new ConcurrentHashMap<Integer, OperationResponseListener>();
+ /** a map to hold the ResponseFutures for all operations */
+ private Map<Integer, ResponseFuture> futureMap = new ConcurrentHashMap<Integer, ResponseFuture>();
+
/** list of controls supported by the server */
private List<String> supportedControls;
@@ -709,6 +712,9 @@
message.setProtocolOP( addReqCodec );
+ ResponseFuture addFuture = new ResponseFuture( addResponseQueue );
+ futureMap.put( newId, addFuture );
+
// Send the request to the server
ldapSession.write( message );
@@ -718,7 +724,7 @@
try
{
long timeout = getTimeout( addRequest.getTimeout() );
- response = addResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+ response = ( AddResponse ) addFuture.get( timeout, TimeUnit.MILLISECONDS );
if ( response == null )
{
@@ -727,9 +733,15 @@
throw new LdapException( TIME_OUT_ERROR );
}
}
+ catch( InterruptedException ie )
+ {
+ LOG.error( "Operation would have been cancelled", ie );
+ throw new LdapException( ie );
+ }
catch( Exception e )
{
LOG.error( NO_RESPONSE_ERROR );
+ futureMap.remove( newId );
unlockSession();
throw new LdapException( e );
}
@@ -818,6 +830,28 @@
// Send the request to the server
ldapSession.write( message );
+
+ // remove the associated listener if any
+ int abandonId = abandonRequest.getAbandonedMessageId();
+
+ ResponseFuture rf = futureMap.remove( abandonId );
+ if( rf != null )
+ {
+ LOG.debug( "sending cancel signal to future" );
+ rf.cancel( true );
+ }
+ else
+ {
+ LOG.error( "There is no future asscoiated with operation message ID {}, perhaps the operation would have been completed", abandonId );
+ }
+
+ OperationResponseListener listener = listenerMap.remove( abandonId );
+
+ if( listener != null )
+ {
+ LOG.error( "removed the listener associated with the abandoned operation with id {}", abandonId );
+ }
+
}
@@ -914,7 +948,7 @@
* @param bindRequest The BindRequest to send
* @param listener The listener
*/
- public BindFuture bind( BindRequest bindRequest, BindListener bindListener ) throws LdapException
+ public ResponseFuture bind( BindRequest bindRequest, BindListener bindListener ) throws LdapException
{
return bindAsyncInternal( bindRequest, bindListener );
}
@@ -929,7 +963,7 @@
private BindResponse bindInternal( BindRequest bindRequest ) throws LdapException
{
// Create the future to get the result
- BindFuture bindFuture = bindAsyncInternal( bindRequest, null );
+ ResponseFuture bindFuture = bindAsyncInternal( bindRequest, null );
// And get the result
try
@@ -938,7 +972,7 @@
long timeout = getTimeout( bindRequest.getTimeout() );
// Get the response, blocking
- BindResponse bindResponse = bindFuture.get( timeout, TimeUnit.MILLISECONDS );
+ BindResponse bindResponse = ( BindResponse ) bindFuture.get( timeout, TimeUnit.MILLISECONDS );
// Release the session lock
unlockSession();
@@ -951,7 +985,10 @@
catch ( TimeoutException te )
{
// Send an abandon request
- abandon( bindRequest.getMessageId() );
+ if( !bindFuture.isCancelled() )
+ {
+ abandon( bindRequest.getMessageId() );
+ }
// We didn't received anything : this is an error
LOG.error( "Bind failed : timeout occured" );
@@ -967,7 +1004,10 @@
unlockSession();
// Send an abandon request
- abandon( bindRequest.getMessageId() );
+ if( !bindFuture.isCancelled() )
+ {
+ abandon( bindRequest.getMessageId() );
+ }
throw ldapException;
}
}
@@ -979,7 +1019,7 @@
* @param bindRequest The BindRequest to send
* @param listener The listener (Can be null)
*/
- private BindFuture bindAsyncInternal( BindRequest bindRequest, BindListener bindListener ) throws LdapException
+ private ResponseFuture bindAsyncInternal( BindRequest bindRequest, BindListener bindListener ) throws LdapException
{
// First try to connect, if we aren't already connected.
connect();
@@ -1055,11 +1095,14 @@
LOG.debug( "-----------------------------------------------------------------" );
LOG.debug( "Sending request \n{}", bindMessage );
+ ResponseFuture rf = new ResponseFuture( bindResponseQueue );
+ futureMap.put( newId, rf );
+
// Send the request to the server
ldapSession.write( bindMessage );
// Return the associated future
- return new BindFuture( bindResponseQueue );
+ return rf;
}
@@ -1225,9 +1268,13 @@
LOG.debug( "-----------------------------------------------------------------" );
LOG.debug( "Sending request \n{}", searchMessage );
+ ResponseFuture searchFuture = new ResponseFuture( searchResponseQueue );
+ futureMap.put( newId, searchFuture );
+
// Send the request to the server
ldapSession.write( searchMessage );
+
if ( searchListener == null )
{
// Read the response, waiting for it if not available immediately
@@ -1240,7 +1287,7 @@
// We may have more than one response, so loop on the queue
do
{
- response = searchResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+ response = ( SearchResponse ) searchFuture.get( timeout, TimeUnit.MILLISECONDS );
// Check that we didn't get out because of a timeout
if ( response == null )
@@ -1249,7 +1296,7 @@
abandon( searchMessage.getSearchRequest().getMessageId() );
// We didn't received anything : this is an error
- LOG.error( "Bind failed : timeout occured" );
+ LOG.error( "Search failed : timeout occured" );
unlockSession();
throw new LdapException( TIME_OUT_ERROR );
}
@@ -1274,14 +1321,22 @@
return new ListCursor<SearchResponse>( searchResponses );
}
- catch ( InterruptedException ie )
+ catch( InterruptedException ie )
+ {
+ LOG.error( "Operation would have been cancelled", ie );
+ throw new LdapException( ie );
+ }
+ catch ( Exception e )
{
LOG.error( "The response queue has been emptied, no response will be find." );
LdapException ldapException = new LdapException();
- ldapException.initCause( ie );
+ ldapException.initCause( e );
// Send an abandon request
- abandon( searchMessage.getBindRequest().getMessageId() );
+ if( !searchFuture.isCancelled() )
+ {
+ abandon( searchMessage.getBindRequest().getMessageId() );
+ }
throw ldapException;
}
}
@@ -1375,8 +1430,17 @@
LOG.debug( "-------> {} Message received <-------", response.getMessageTypeName() );
- SearchListener searchListener = null;
+ // this check is necessary to prevent adding an abandoned operation's
+ // result(s) to corresponding queue
+ ResponseFuture rf = futureMap.get( response.getMessageId() );
+ if( rf == null )
+ {
+ LOG.error( "There is no future associated with the messageId {}, ignoring the message", response.getMessageId() );
+ return;
+ }
+ SearchListener searchListener = null;
+
switch ( response.getMessageType() )
{
case LdapConstants.ADD_RESPONSE :
@@ -1385,6 +1449,7 @@
addRespCodec.addControl( response.getCurrentControl() );
addRespCodec.setMessageId( response.getMessageId() );
+ futureMap.remove( addRespCodec.getMessageId() );
AddListener addListener = ( AddListener ) listenerMap.remove( addRespCodec.getMessageId() );
AddResponse addResp = convert( addRespCodec );
if( addListener != null )
@@ -1403,6 +1468,7 @@
bindResponseCodec.addControl( response.getCurrentControl() );
BindResponse bindResponse = convert( bindResponseCodec );
+ futureMap.remove( bindResponseCodec.getMessageId() );
// remove the listener from the listener map
BindListener bindListener = ( BindListener ) listenerMap.remove( bindResponseCodec.getMessageId() );
if ( bindListener != null )
@@ -1428,8 +1494,9 @@
delRespCodec.setMessageId( response.getMessageId() );
delRespCodec.addControl( response.getCurrentControl() );
DeleteResponse delResp = convert( delRespCodec );
- DeleteListener delListener = ( DeleteListener ) listenerMap.remove( delResp.getMessageId() );
+ futureMap.remove( delResp.getMessageId() );
+ DeleteListener delListener = ( DeleteListener ) listenerMap.remove( delResp.getMessageId() );
if( delListener != null )
{
delListener.entryDeleted( this, delResp );
@@ -1470,8 +1537,9 @@
ModifyResponseCodec modRespCodec = response.getModifyResponse();
modRespCodec.setMessageId( response.getMessageId() );
modRespCodec.addControl( response.getCurrentControl() );
-
ModifyResponse modResp = convert( modRespCodec );
+
+ futureMap.remove( modResp.getMessageId() );
ModifyListener modListener = ( ModifyListener ) listenerMap.remove( modResp.getMessageId() );
if( modListener != null )
@@ -1490,6 +1558,8 @@
modDnCodec.addControl( response.getCurrentControl() );
modDnCodec.setMessageId( response.getMessageId() );
ModifyDnResponse modDnResp = convert( modDnCodec );
+
+ futureMap.remove( modDnCodec.getMessageId() );
ModifyDnListener modDnListener = ( ModifyDnListener ) listenerMap.remove( modDnCodec.getMessageId() );
if( modDnListener != null )
{
@@ -1509,6 +1579,8 @@
searchResultDoneCodec.setMessageId( response.getMessageId() );
searchResultDoneCodec.addControl( response.getCurrentControl() );
SearchResultDone srchDone = convert( searchResultDoneCodec );
+
+ futureMap.remove( searchResultDoneCodec.getMessageId() );
// search listener has to be removed from listener map only here
searchListener = ( SearchListener ) listenerMap.remove( searchResultDoneCodec.getMessageId() );
if ( searchListener != null )
@@ -1531,6 +1603,7 @@
SearchResultEntry srchEntry = convert( searchResultEntryCodec );
searchListener = ( SearchListener ) listenerMap.get( searchResultEntryCodec.getMessageId() );
+
if ( searchListener != null )
{
searchListener.entryFound( this, srchEntry );
@@ -1623,16 +1696,19 @@
modifyMessage.setProtocolOP( modReqCodec );
setControls( modRequest.getControls(), modifyMessage );
+
+ ResponseFuture modifyFuture = new ResponseFuture( modifyResponseQueue );
+ futureMap.put( newId, modifyFuture );
ldapSession.write( modifyMessage );
-
+
ModifyResponse response = null;
if( listener == null )
{
try
{
long timeout = getTimeout( modRequest.getTimeout() );
- response = modifyResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+ response = ( ModifyResponse ) modifyFuture.get( timeout, TimeUnit.MILLISECONDS );
if ( response == null )
{
@@ -1641,9 +1717,15 @@
throw new LdapException( TIME_OUT_ERROR );
}
}
+ catch( InterruptedException ie )
+ {
+ LOG.error( "Operation would have been cancelled", ie );
+ throw new LdapException( ie );
+ }
catch( Exception e )
{
LOG.error( NO_RESPONSE_ERROR );
+ futureMap.remove( newId );
unlockSession();
throw new LdapException( e );
}
@@ -1800,6 +1882,9 @@
modifyDnMessage.setProtocolOP( modDnCodec );
setControls( modDnRequest.getControls(), modifyDnMessage );
+ ResponseFuture modifyDNFuture = new ResponseFuture( modifyDNResponseQueue );
+ futureMap.put( newId, modifyDNFuture );
+
ldapSession.write( modifyDnMessage );
if( listener == null )
@@ -1808,7 +1893,7 @@
try
{
long timeout = getTimeout( modDnRequest.getTimeout() );
- response = modifyDNResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+ response = ( ModifyDnResponse ) modifyDNFuture.get( timeout, TimeUnit.MILLISECONDS );
if ( response == null )
{
@@ -1817,9 +1902,15 @@
throw new LdapException( TIME_OUT_ERROR );
}
}
+ catch( InterruptedException ie )
+ {
+ LOG.error( "Operation would have been cancelled", ie );
+ throw new LdapException( ie );
+ }
catch( Exception e )
{
LOG.error( NO_RESPONSE_ERROR );
+ futureMap.remove( newId );
unlockSession();
LdapException ldapException = new LdapException();
ldapException.initCause( e );
@@ -2034,6 +2125,9 @@
deleteMessage.setProtocolOP( delCodec );
setControls( delRequest.getControls(), deleteMessage );
+ ResponseFuture deleteFuture = new ResponseFuture( deleteResponseQueue );
+ futureMap.put( newId, deleteFuture );
+
ldapSession.write( deleteMessage );
DeleteResponse response = null;
@@ -2042,7 +2136,7 @@
try
{
long timeout = getTimeout( delRequest.getTimeout() );
- response = deleteResponseQueue.poll( timeout, TimeUnit.MILLISECONDS );
+ response = ( DeleteResponse ) deleteFuture.get( timeout, TimeUnit.MILLISECONDS );
if ( response == null )
{
@@ -2051,15 +2145,20 @@
throw new LdapException( TIME_OUT_ERROR );
}
}
+ catch( InterruptedException ie )
+ {
+ LOG.error( "Operation would have been cancelled", ie );
+ throw new LdapException( ie );
+ }
catch( Exception e )
{
LOG.error( NO_RESPONSE_ERROR );
+ futureMap.remove( newId );
unlockSession();
LdapException ldapException = new LdapException();
ldapException.initCause( e );
throw ldapException;
}
-
}
else
{
Added: directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java
URL: http://svn.apache.org/viewvc/directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java?rev=793367&view=auto
==============================================================================
--- directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java (added)
+++ directory/shared/trunk/client-api/src/main/java/org/apache/directory/shared/ldap/client/api/messages/future/ResponseFuture.java Sun Jul 12 16:03:16 2009
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.directory.shared.ldap.client.api.messages.future;
+
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.directory.shared.ldap.client.api.messages.AbstractResponseWithResult;
+import org.apache.directory.shared.ldap.client.api.messages.Response;
+
+
+/**
+ * A Future implementation used in LdapConnection operations.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class ResponseFuture implements Future<Response>
+{
+ /** the blocking queue holding LDAP responses */
+ private final BlockingQueue<Response> responseQueue;
+
+ /** flag to determine if this future is cancelled */
+ private boolean cancelled = false;
+
+ /** an object to indicate a cancelled/abandoned operation */
+ // 'poision pill shutdown' refer p. 155-156 Java Concurrency in Practice - Brian Goetz
+ private static final Response CANCEL_POISION = new AbstractResponseWithResult(){};
+
+
+ /**
+ * Creates a new instance of ResponseFuture.
+ *
+ * @param responseQueue a non-null blocking queue
+ */
+ public ResponseFuture( final BlockingQueue responseQueue )
+ {
+ if ( responseQueue == null )
+ {
+ throw new NullPointerException( "response queue cannot be null" );
+ }
+
+ this.responseQueue = responseQueue;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean cancel( boolean mayInterruptIfRunning )
+ {
+ if( cancelled )
+ {
+ return cancelled;
+ }
+
+ cancelled = true;
+ responseQueue.add( CANCEL_POISION );
+
+ return cancelled;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ * @throws InterruptedException if the operation has been cancelled by client
+ */
+ public Response get() throws InterruptedException, ExecutionException
+ {
+ Response resp = responseQueue.poll();
+
+ if( resp == CANCEL_POISION )
+ {
+ throw new InterruptedException( "cancelled" );
+ }
+
+ return resp;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ * @throws InterruptedException if the operation has been cancelled by client
+ */
+ public Response get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException,
+ TimeoutException
+ {
+ Response resp = responseQueue.poll( timeout, unit );
+
+ if( resp == CANCEL_POISION )
+ {
+ throw new InterruptedException( "cancelled" );
+ }
+
+ return resp;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isCancelled()
+ {
+ return cancelled;
+ }
+
+
+ /**
+ * This operation is not supported in this implementation of Future
+ */
+ public boolean isDone()
+ {
+ throw new UnsupportedOperationException( "Operation not supported" );
+ }
+
+}