You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/19 18:48:23 UTC

svn commit: r1618896 - in /qpid/jms/trunk/src/main/java/org/apache/qpid/jms: engine/ impl/

Author: robbie
Date: Tue Aug 19 16:48:22 2014
New Revision: 1618896

URL: http://svn.apache.org/r1618896
Log:
QPIDJMS-27: initial work on using events to signal operation completion

Added:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Tue Aug 19 16:48:22 2014
@@ -117,7 +117,6 @@ public class AmqpConnection
 
         AmqpSession amqpSession = new AmqpSession(this, session);
         session.setContext(amqpSession);
-        session.open();
 
         addPendingSession(session);
 
@@ -344,7 +343,7 @@ public class AmqpConnection
             if(session.getRemoteState() != EndpointState.UNINITIALIZED)
             {
                 AmqpSession amqpSession = (AmqpSession) session.getContext();
-                amqpSession.setEstablished();
+                amqpSession.opened();
                 _pendingSessions.remove(session);//TODO: delete pending sessions?
             }
         };
@@ -355,7 +354,7 @@ public class AmqpConnection
             if(session.getRemoteState() == EndpointState.CLOSED)
             {
                 AmqpSession amqpSession = (AmqpSession) session.getContext();
-                amqpSession.setClosed();
+                amqpSession.closed();
                 _pendingCloseSessions.remove(session);//TODO: delete pending close sessions?
             }
         }
@@ -370,11 +369,16 @@ public class AmqpConnection
                 AmqpLink amqpLink = (AmqpLink) link.getContext();
                 if(getRemoteNode(link) != null)
                 {
-                    amqpLink.setEstablished();
+                    amqpLink.opened();
                 }
                 else
                 {
+                    // As per AMQP 1.0 specification, section 2.6.3 Establishing Or Resuming A Link,
+                    // the returned attach may contain a null source/target if the peer link endpoint
+                    // had no associated local terminus and chooses not to create one.
                     amqpLink.setLinkError();
+                    //TODO: the above is just a marker, we need to trigger any waiters, e.g:
+                    //amqpLink.failed(cause);
                 }
                 _pendingLinks.remove(link);//TODO: delete pending links?
             }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Tue Aug 19 16:48:22 2014
@@ -25,14 +25,13 @@ import java.util.logging.Logger;
 
 import org.apache.qpid.proton.engine.Link;
 
-public abstract class AmqpLink
+public abstract class AmqpLink extends AmqpResource
 {
     private static Logger _logger = Logger.getLogger("qpid.jms-client.link");
 
     private final AmqpConnection _amqpConnection;
     private final AmqpSession _amqpSession;
     private final Link _protonLink;
-    private boolean _established;
     private boolean _linkError;
     private boolean _closed;
 
@@ -44,16 +43,6 @@ public abstract class AmqpLink
         _amqpConnection = amqpConnection;
     }
 
-    public boolean isEstablished()
-    {
-        return _established;
-    }
-
-    void setEstablished()
-    {
-        _established = true;
-    }
-
     public boolean getLinkError()
     {
         return _linkError;
@@ -96,4 +85,15 @@ public abstract class AmqpLink
         return _closed;
     }
 
+    @Override
+    protected void doOpen()
+    {
+        _protonLink.open();
+    }
+
+    @Override
+    protected void doClose()
+    {
+        _protonLink.close();
+    }
 }

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java?rev=1618896&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java Tue Aug 19 16:48:22 2014
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+public abstract class AmqpResource
+{
+    private AmqpResourceRequest<?> _openRequest;
+    private AmqpResourceRequest<?> _closeRequest;
+
+    public void open(AmqpResourceRequest<?> request)
+    {
+        _openRequest = request;
+        doOpen();
+    }
+
+    public void opened()
+    {
+        if(_openRequest != null)
+        {
+            _openRequest.onSuccess(null);
+            _openRequest = null;
+        }
+    }
+
+    public void close(AmqpResourceRequest<?> request)
+    {
+        _closeRequest = request;
+        doClose();
+    }
+
+    public void closed()
+    {
+        if(_closeRequest != null)
+        {
+            _closeRequest.onSuccess(null);
+            _closeRequest = null;
+        }
+    }
+
+    protected abstract void doOpen();
+
+    protected abstract void doClose();
+}

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java?rev=1618896&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java Tue Aug 19 16:48:22 2014
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+public class AmqpResourceRequest<T>
+{
+    private CountDownLatch _latch;
+    private volatile boolean _success;//TODO: delete?
+    private T _result;
+    private Throwable _cause;
+
+    public AmqpResourceRequest()
+    {
+        _latch = new CountDownLatch(1);
+    }
+
+    public boolean isComplete()
+    {
+        return _latch.getCount() == 0;
+    }
+
+    public boolean isSuccess()
+    {
+        return _success;
+    }
+
+    public void onSuccess(T result)
+    {
+        _success = true;
+        markCompletion();
+    }
+
+    public void onFailure(Throwable cause)
+    {
+        _success = false;
+        _cause = cause;
+        markCompletion();
+    }
+
+    public T getResult() throws IOException
+    {
+        //TODO: timeout
+        try
+        {
+            _latch.await();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.interrupted();
+            throw new IOException("Unable to retrieve result due to interruption", e);
+        }
+
+        if(_cause !=null)
+        {
+            throw new IOException("Unable to retrieve result due to failure", _cause);
+        }
+
+        return _result;
+    }
+
+    private void markCompletion()
+    {
+        _latch.countDown();
+    }
+}

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Tue Aug 19 16:48:22 2014
@@ -31,11 +31,10 @@ import org.apache.qpid.proton.engine.Rec
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 
-public class AmqpSession
+public class AmqpSession extends AmqpResource
 {
     private final AmqpConnection _amqpConnection;
     private final Session _protonSession;
-    private boolean _established;
 
     private boolean _closed;
 
@@ -45,27 +44,6 @@ public class AmqpSession
         _protonSession = protonSession;
     }
 
-    public boolean isEstablished()
-    {
-        return _established;
-    }
-
-    void setEstablished()
-    {
-        _established = true;
-    }
-
-    public void close()
-    {
-        _protonSession.close();
-        _amqpConnection.addPendingCloseSession(_protonSession);
-    }
-
-    void setClosed()
-    {
-        _closed = true;
-    }
-
     AmqpConnection getAmqpConnection()
     {
         return _amqpConnection;
@@ -130,4 +108,15 @@ public class AmqpSession
     {
         return _protonSession.getCondition();
     }
+
+    protected void doOpen()
+    {
+        _protonSession.open();
+    }
+
+    protected void doClose()
+    {
+        _protonSession.close();
+        _amqpConnection.addPendingCloseSession(_protonSession);
+    }
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Tue Aug 19 16:48:22 2014
@@ -36,6 +36,7 @@ import javax.jms.Topic;
 
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpConnectionDriverNetty;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
 import org.apache.qpid.jms.engine.AmqpSession;
 
 /**
@@ -104,6 +105,11 @@ public class ConnectionImpl implements C
         _messageIdHelper = new MessageIdHelper();
     }
 
+    AmqpConnection getAmqpConnection()
+    {
+        return _amqpConnection;
+    }
+
     void waitUntil(Predicate condition, long timeoutMillis) throws JmsTimeoutException, JmsInterruptedException
     {
         long deadline = timeoutMillis < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutMillis;
@@ -242,6 +248,18 @@ public class ConnectionImpl implements C
         return _username;
     }
 
+    void waitForResult(AmqpResourceRequest<Void> request, String message) throws JMSException
+    {
+        try
+        {
+            request.getResult();
+        }
+        catch (IOException e)
+        {
+            throw new QpidJmsException(message, e);
+        }
+    }
+
     //======= JMS Methods =======
 
 
@@ -294,11 +312,18 @@ public class ConnectionImpl implements C
         lock();
         try
         {
-            AmqpSession amqpSession = _amqpConnection.createSession();
+            AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
 
-            SessionImpl session = new SessionImpl(acknowledgeMode, amqpSession, this, _destinationHelper, _messageIdHelper);
-            stateChanged();
-            session.establish();
+            SessionImpl session = null;
+            synchronized (_amqpConnection)
+            {
+                AmqpSession amqpSession = _amqpConnection.createSession();
+                session = new SessionImpl(acknowledgeMode, amqpSession, this, _destinationHelper, _messageIdHelper);
+                session.open(request);
+                stateChanged();
+            }
+
+            waitForResult(request, "Exception while creating session");
 
             return session;
         }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Tue Aug 19 16:48:22 2014
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpLink;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
 
 public class LinkImpl
 {
@@ -36,22 +37,6 @@ public class LinkImpl
         _amqpLink = amqpLink;
     }
 
-    public void establish() throws LinkException, JmsTimeoutException, JmsInterruptedException
-    {
-        _connectionImpl.waitUntil(new SimplePredicate("Link is established or failed", _amqpLink)
-        {
-            @Override
-            public boolean test()
-            {
-                return _amqpLink.isEstablished() || _amqpLink.getLinkError();
-            }
-        }, AmqpConnection.TIMEOUT);
-        if(!_amqpLink.isEstablished())
-        {
-            throw new LinkException("Failed to establish link " + _amqpLink);
-        }
-    }
-
     public void close() throws JMSException
     {
         _connectionImpl.lock();
@@ -79,4 +64,8 @@ public class LinkImpl
         return _connectionImpl;
     }
 
+    public void open(AmqpResourceRequest<Void> request) throws JmsTimeoutException, JmsInterruptedException
+    {
+        _amqpLink.open(request);
+    }
 }
\ No newline at end of file

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Tue Aug 19 16:48:22 2014
@@ -42,8 +42,8 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
-import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpReceiver;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
 import org.apache.qpid.jms.engine.AmqpSender;
 import org.apache.qpid.jms.engine.AmqpSession;
 
@@ -66,18 +66,6 @@ public class SessionImpl implements Sess
         _messageIdHelper = messageIdHelper;
     }
 
-     void establish() throws JmsTimeoutException, JmsInterruptedException
-    {
-        _connectionImpl.waitUntil(new SimplePredicate("Session established", _amqpSession)
-        {
-            @Override
-            public boolean test()
-            {
-                return _amqpSession.isEstablished();
-            }
-        }, AmqpConnection.TIMEOUT);
-    }
-
     ConnectionImpl getConnectionImpl()
     {
         return _connectionImpl;
@@ -88,10 +76,19 @@ public class SessionImpl implements Sess
         _connectionImpl.lock();
         try
         {
-            AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
-            SenderImpl sender = new SenderImpl(this, _connectionImpl, amqpSender, destination);
-            _connectionImpl.stateChanged();
-            sender.establish();
+            AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+            SenderImpl sender = null;
+            synchronized (_connectionImpl.getAmqpConnection())
+            {
+                AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
+                sender = new SenderImpl(this, _connectionImpl, amqpSender, destination);
+                sender.open(request);
+                _connectionImpl.stateChanged();
+            }
+
+            _connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
+
             return sender;
         }
         finally
@@ -105,10 +102,19 @@ public class SessionImpl implements Sess
         _connectionImpl.lock();
         try
         {
-            AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
-            ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
-            _connectionImpl.stateChanged();
-            receiver.establish();
+            AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+            ReceiverImpl receiver = null;
+            AmqpReceiver amqpReceiver = null;
+            synchronized (_connectionImpl.getAmqpConnection())
+            {
+                amqpReceiver = _amqpSession.createAmqpReceiver(address);
+                receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
+                receiver.open(request);
+                _connectionImpl.stateChanged();
+            }
+
+            _connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
 
             if(_connectionImpl.isStarted())
             {
@@ -136,6 +142,10 @@ public class SessionImpl implements Sess
         return _destinationHelper;
     }
 
+    public void open(AmqpResourceRequest<Void> request) throws JmsTimeoutException, JmsInterruptedException
+    {
+        _amqpSession.open(request);
+    }
 
     //======= JMS Methods =======
 
@@ -146,20 +156,16 @@ public class SessionImpl implements Sess
         _connectionImpl.lock();
         try
         {
-            _amqpSession.close();
-            _connectionImpl.stateChanged();
-            while(!_amqpSession.isClosed())
+            AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+            synchronized (_connectionImpl.getAmqpConnection())
             {
-                _connectionImpl.waitUntil(new SimplePredicate("Session is closed", _amqpSession)
-                {
-                    @Override
-                    public boolean test()
-                    {
-                        return _amqpSession.isClosed();
-                    }
-                }, AmqpConnection.TIMEOUT);
+                _amqpSession.close(request);
+                _connectionImpl.stateChanged();
             }
 
+            _connectionImpl.waitForResult(request, "Exception while closing session");
+
             if(_amqpSession.getSessionError().getCondition() != null)
             {
                 throw new ConnectionException("Session close failed: " + _amqpSession.getSessionError());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org