You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/19 18:48:23 UTC
svn commit: r1618896 - in /qpid/jms/trunk/src/main/java/org/apache/qpid/jms:
engine/ impl/
Author: robbie
Date: Tue Aug 19 16:48:22 2014
New Revision: 1618896
URL: http://svn.apache.org/r1618896
Log:
QPIDJMS-27: initial work on using events to signal operation completion
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Tue Aug 19 16:48:22 2014
@@ -117,7 +117,6 @@ public class AmqpConnection
AmqpSession amqpSession = new AmqpSession(this, session);
session.setContext(amqpSession);
- session.open();
addPendingSession(session);
@@ -344,7 +343,7 @@ public class AmqpConnection
if(session.getRemoteState() != EndpointState.UNINITIALIZED)
{
AmqpSession amqpSession = (AmqpSession) session.getContext();
- amqpSession.setEstablished();
+ amqpSession.opened();
_pendingSessions.remove(session);//TODO: delete pending sessions?
}
};
@@ -355,7 +354,7 @@ public class AmqpConnection
if(session.getRemoteState() == EndpointState.CLOSED)
{
AmqpSession amqpSession = (AmqpSession) session.getContext();
- amqpSession.setClosed();
+ amqpSession.closed();
_pendingCloseSessions.remove(session);//TODO: delete pending close sessions?
}
}
@@ -370,11 +369,16 @@ public class AmqpConnection
AmqpLink amqpLink = (AmqpLink) link.getContext();
if(getRemoteNode(link) != null)
{
- amqpLink.setEstablished();
+ amqpLink.opened();
}
else
{
+ // As per AMQP 1.0 specification, section 2.6.3 Establishing Or Resuming A Link,
+ // the returned attach may contain a null source/target if the peer link endpoint
+ // had no associated local terminus and chooses not to create one.
amqpLink.setLinkError();
+ //TODO: the above is just a marker, we need to trigger any waiters, e.g:
+ //amqpLink.failed(cause);
}
_pendingLinks.remove(link);//TODO: delete pending links?
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Tue Aug 19 16:48:22 2014
@@ -25,14 +25,13 @@ import java.util.logging.Logger;
import org.apache.qpid.proton.engine.Link;
-public abstract class AmqpLink
+public abstract class AmqpLink extends AmqpResource
{
private static Logger _logger = Logger.getLogger("qpid.jms-client.link");
private final AmqpConnection _amqpConnection;
private final AmqpSession _amqpSession;
private final Link _protonLink;
- private boolean _established;
private boolean _linkError;
private boolean _closed;
@@ -44,16 +43,6 @@ public abstract class AmqpLink
_amqpConnection = amqpConnection;
}
- public boolean isEstablished()
- {
- return _established;
- }
-
- void setEstablished()
- {
- _established = true;
- }
-
public boolean getLinkError()
{
return _linkError;
@@ -96,4 +85,15 @@ public abstract class AmqpLink
return _closed;
}
+ @Override
+ protected void doOpen()
+ {
+ _protonLink.open();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ _protonLink.close();
+ }
}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java?rev=1618896&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResource.java Tue Aug 19 16:48:22 2014
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+public abstract class AmqpResource
+{
+ private AmqpResourceRequest<?> _openRequest;
+ private AmqpResourceRequest<?> _closeRequest;
+
+ public void open(AmqpResourceRequest<?> request)
+ {
+ _openRequest = request;
+ doOpen();
+ }
+
+ public void opened()
+ {
+ if(_openRequest != null)
+ {
+ _openRequest.onSuccess(null);
+ _openRequest = null;
+ }
+ }
+
+ public void close(AmqpResourceRequest<?> request)
+ {
+ _closeRequest = request;
+ doClose();
+ }
+
+ public void closed()
+ {
+ if(_closeRequest != null)
+ {
+ _closeRequest.onSuccess(null);
+ _closeRequest = null;
+ }
+ }
+
+ protected abstract void doOpen();
+
+ protected abstract void doClose();
+}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java?rev=1618896&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpResourceRequest.java Tue Aug 19 16:48:22 2014
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+public class AmqpResourceRequest<T>
+{
+ private CountDownLatch _latch;
+ private volatile boolean _success;//TODO: delete?
+ private T _result;
+ private Throwable _cause;
+
+ public AmqpResourceRequest()
+ {
+ _latch = new CountDownLatch(1);
+ }
+
+ public boolean isComplete()
+ {
+ return _latch.getCount() == 0;
+ }
+
+ public boolean isSuccess()
+ {
+ return _success;
+ }
+
+ public void onSuccess(T result)
+ {
+ _success = true;
+ markCompletion();
+ }
+
+ public void onFailure(Throwable cause)
+ {
+ _success = false;
+ _cause = cause;
+ markCompletion();
+ }
+
+ public T getResult() throws IOException
+ {
+ //TODO: timeout
+ try
+ {
+ _latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.interrupted();
+ throw new IOException("Unable to retrieve result due to interruption", e);
+ }
+
+ if(_cause !=null)
+ {
+ throw new IOException("Unable to retrieve result due to failure", _cause);
+ }
+
+ return _result;
+ }
+
+ private void markCompletion()
+ {
+ _latch.countDown();
+ }
+}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Tue Aug 19 16:48:22 2014
@@ -31,11 +31,10 @@ import org.apache.qpid.proton.engine.Rec
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
-public class AmqpSession
+public class AmqpSession extends AmqpResource
{
private final AmqpConnection _amqpConnection;
private final Session _protonSession;
- private boolean _established;
private boolean _closed;
@@ -45,27 +44,6 @@ public class AmqpSession
_protonSession = protonSession;
}
- public boolean isEstablished()
- {
- return _established;
- }
-
- void setEstablished()
- {
- _established = true;
- }
-
- public void close()
- {
- _protonSession.close();
- _amqpConnection.addPendingCloseSession(_protonSession);
- }
-
- void setClosed()
- {
- _closed = true;
- }
-
AmqpConnection getAmqpConnection()
{
return _amqpConnection;
@@ -130,4 +108,15 @@ public class AmqpSession
{
return _protonSession.getCondition();
}
+
+ protected void doOpen()
+ {
+ _protonSession.open();
+ }
+
+ protected void doClose()
+ {
+ _protonSession.close();
+ _amqpConnection.addPendingCloseSession(_protonSession);
+ }
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Tue Aug 19 16:48:22 2014
@@ -36,6 +36,7 @@ import javax.jms.Topic;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpConnectionDriverNetty;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
import org.apache.qpid.jms.engine.AmqpSession;
/**
@@ -104,6 +105,11 @@ public class ConnectionImpl implements C
_messageIdHelper = new MessageIdHelper();
}
+ AmqpConnection getAmqpConnection()
+ {
+ return _amqpConnection;
+ }
+
void waitUntil(Predicate condition, long timeoutMillis) throws JmsTimeoutException, JmsInterruptedException
{
long deadline = timeoutMillis < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutMillis;
@@ -242,6 +248,18 @@ public class ConnectionImpl implements C
return _username;
}
+ void waitForResult(AmqpResourceRequest<Void> request, String message) throws JMSException
+ {
+ try
+ {
+ request.getResult();
+ }
+ catch (IOException e)
+ {
+ throw new QpidJmsException(message, e);
+ }
+ }
+
//======= JMS Methods =======
@@ -294,11 +312,18 @@ public class ConnectionImpl implements C
lock();
try
{
- AmqpSession amqpSession = _amqpConnection.createSession();
+ AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
- SessionImpl session = new SessionImpl(acknowledgeMode, amqpSession, this, _destinationHelper, _messageIdHelper);
- stateChanged();
- session.establish();
+ SessionImpl session = null;
+ synchronized (_amqpConnection)
+ {
+ AmqpSession amqpSession = _amqpConnection.createSession();
+ session = new SessionImpl(acknowledgeMode, amqpSession, this, _destinationHelper, _messageIdHelper);
+ session.open(request);
+ stateChanged();
+ }
+
+ waitForResult(request, "Exception while creating session");
return session;
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Tue Aug 19 16:48:22 2014
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpLink;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
public class LinkImpl
{
@@ -36,22 +37,6 @@ public class LinkImpl
_amqpLink = amqpLink;
}
- public void establish() throws LinkException, JmsTimeoutException, JmsInterruptedException
- {
- _connectionImpl.waitUntil(new SimplePredicate("Link is established or failed", _amqpLink)
- {
- @Override
- public boolean test()
- {
- return _amqpLink.isEstablished() || _amqpLink.getLinkError();
- }
- }, AmqpConnection.TIMEOUT);
- if(!_amqpLink.isEstablished())
- {
- throw new LinkException("Failed to establish link " + _amqpLink);
- }
- }
-
public void close() throws JMSException
{
_connectionImpl.lock();
@@ -79,4 +64,8 @@ public class LinkImpl
return _connectionImpl;
}
+ public void open(AmqpResourceRequest<Void> request) throws JmsTimeoutException, JmsInterruptedException
+ {
+ _amqpLink.open(request);
+ }
}
\ No newline at end of file
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1618896&r1=1618895&r2=1618896&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Tue Aug 19 16:48:22 2014
@@ -42,8 +42,8 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpReceiver;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
import org.apache.qpid.jms.engine.AmqpSender;
import org.apache.qpid.jms.engine.AmqpSession;
@@ -66,18 +66,6 @@ public class SessionImpl implements Sess
_messageIdHelper = messageIdHelper;
}
- void establish() throws JmsTimeoutException, JmsInterruptedException
- {
- _connectionImpl.waitUntil(new SimplePredicate("Session established", _amqpSession)
- {
- @Override
- public boolean test()
- {
- return _amqpSession.isEstablished();
- }
- }, AmqpConnection.TIMEOUT);
- }
-
ConnectionImpl getConnectionImpl()
{
return _connectionImpl;
@@ -88,10 +76,19 @@ public class SessionImpl implements Sess
_connectionImpl.lock();
try
{
- AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
- SenderImpl sender = new SenderImpl(this, _connectionImpl, amqpSender, destination);
- _connectionImpl.stateChanged();
- sender.establish();
+ AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+ SenderImpl sender = null;
+ synchronized (_connectionImpl.getAmqpConnection())
+ {
+ AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
+ sender = new SenderImpl(this, _connectionImpl, amqpSender, destination);
+ sender.open(request);
+ _connectionImpl.stateChanged();
+ }
+
+ _connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
+
return sender;
}
finally
@@ -105,10 +102,19 @@ public class SessionImpl implements Sess
_connectionImpl.lock();
try
{
- AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
- ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
- _connectionImpl.stateChanged();
- receiver.establish();
+ AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+ ReceiverImpl receiver = null;
+ AmqpReceiver amqpReceiver = null;
+ synchronized (_connectionImpl.getAmqpConnection())
+ {
+ amqpReceiver = _amqpSession.createAmqpReceiver(address);
+ receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
+ receiver.open(request);
+ _connectionImpl.stateChanged();
+ }
+
+ _connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
if(_connectionImpl.isStarted())
{
@@ -136,6 +142,10 @@ public class SessionImpl implements Sess
return _destinationHelper;
}
+ public void open(AmqpResourceRequest<Void> request) throws JmsTimeoutException, JmsInterruptedException
+ {
+ _amqpSession.open(request);
+ }
//======= JMS Methods =======
@@ -146,20 +156,16 @@ public class SessionImpl implements Sess
_connectionImpl.lock();
try
{
- _amqpSession.close();
- _connectionImpl.stateChanged();
- while(!_amqpSession.isClosed())
+ AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
+
+ synchronized (_connectionImpl.getAmqpConnection())
{
- _connectionImpl.waitUntil(new SimplePredicate("Session is closed", _amqpSession)
- {
- @Override
- public boolean test()
- {
- return _amqpSession.isClosed();
- }
- }, AmqpConnection.TIMEOUT);
+ _amqpSession.close(request);
+ _connectionImpl.stateChanged();
}
+ _connectionImpl.waitForResult(request, "Exception while closing session");
+
if(_amqpSession.getSessionError().getCondition() != null)
{
throw new ConnectionException("Session close failed: " + _amqpSession.getSessionError());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org