You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/03/03 18:06:44 UTC
svn commit: r918575 - in /qpid/trunk/qpid/cpp: examples/messaging/
include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/
src/tests/
Author: gsim
Date: Wed Mar 3 17:06:44 2010
New Revision: 918575
URL: http://svn.apache.org/viewvc?rev=918575&view=rev
Log:
QPID-2402 & QPID-2406: Documented the units for the ttl property of Message. Eliminated use of qpid::sys::Duration from API.
Added:
qpid/trunk/qpid/cpp/include/qpid/messaging/Duration.h
Modified:
qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp
Modified: qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/drain.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/drain.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/drain.cpp Wed Mar 3 17:06:44 2010
@@ -28,14 +28,10 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
-#include <qpid/sys/Time.h>
#include <iostream>
using namespace qpid::messaging;
-using qpid::sys::Duration;
-using qpid::sys::TIME_INFINITE;
-using qpid::sys::TIME_SEC;
struct Options : public qpid::Options
{
@@ -67,8 +63,8 @@
Duration getTimeout()
{
- if (forever) return TIME_INFINITE;
- else return timeout*TIME_SEC;
+ if (forever) return INFINITE_DURATION;
+ else return timeout*DURATION_SEC;
}
bool parse(int argc, char** argv)
Added: qpid/trunk/qpid/cpp/include/qpid/messaging/Duration.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Duration.h?rev=918575&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Duration.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Duration.h Wed Mar 3 17:06:44 2010
@@ -0,0 +1,39 @@
+#ifndef QPID_MESSAGING_DURATION_H
+#define QPID_MESSAGING_DURATION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/IntegerTypes.h"
+#include <limits>
+
+namespace qpid {
+namespace messaging {
+
+/**
+ * A duration is a time in milliseconds.
+ */
+typedef uint64_t Duration;
+const Duration INFINITE_DURATION = std::numeric_limits<uint64_t>::max();
+const Duration DURATION_SEC = 1000;
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_DURATION_H*/
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Wed Mar 3 17:06:44 2010
@@ -23,6 +23,7 @@
*/
#include <string>
+#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Variant.h"
#include "qpid/client/ClientImportExport.h"
@@ -67,8 +68,14 @@
QPID_CLIENT_EXTERN void setCorrelationId(const std::string&);
QPID_CLIENT_EXTERN const std::string& getCorrelationId() const;
- QPID_CLIENT_EXTERN void setTtl(uint64_t ttl);
- QPID_CLIENT_EXTERN uint64_t getTtl() const;
+ /**
+ * Set the time to live for this message in milliseconds.
+ */
+ QPID_CLIENT_EXTERN void setTtl(Duration ttl);
+ /**
+ *Get the time to live for this message in milliseconds.
+ */
+ QPID_CLIENT_EXTERN Duration getTtl() const;
QPID_CLIENT_EXTERN void setDurable(bool durable);
QPID_CLIENT_EXTERN bool getDurable() const;
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Wed Mar 3 17:06:44 2010
@@ -24,7 +24,7 @@
#include "qpid/Exception.h"
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Handle.h"
-#include "qpid/sys/Time.h"
+#include "qpid/messaging/Duration.h"
namespace qpid {
namespace client {
@@ -57,14 +57,14 @@
* available. Returns false if there is no message to give after
* waiting for the specified timeout.
*/
- QPID_CLIENT_EXTERN bool get(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN bool get(Message& message, Duration timeout=INFINITE_DURATION);
/**
* Retrieves a message from this receivers local queue, or waits
* for upto the specified timeout for a message to become
* available. Throws NoMessageAvailable if there is no
* message to give after waiting for the specified timeout.
*/
- QPID_CLIENT_EXTERN Message get(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN Message get(Duration timeout=INFINITE_DURATION);
/**
* Retrieves a message for this receivers subscription or waits
* for upto the specified timeout for one to become
@@ -72,7 +72,7 @@
* that there is no message for the subscription this receiver is
* serving before returning false.
*/
- QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN bool fetch(Message& message, Duration timeout=INFINITE_DURATION);
/**
* Retrieves a message for this receivers subscription or waits
* for up to the specified timeout for one to become
@@ -80,7 +80,7 @@
* that there is no message for the subscription this receiver is
* serving before throwing an exception.
*/
- QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN Message fetch(Duration timeout=INFINITE_DURATION);
/**
* Sets the capacity for the receiver. The capacity determines how
* many incoming messages can be held in the receiver before being
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Wed Mar 3 17:06:44 2010
@@ -22,6 +22,7 @@
*
*/
#include "qpid/Exception.h"
+#include "qpid/messaging/Duration.h"
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Handle.h"
#include "qpid/sys/Time.h"
@@ -100,7 +101,7 @@
* which case the passed in receiver reference will be set to the
* receiver for that message or fals if no message was available.
*/
- QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, Duration timeout=INFINITE_DURATION);
/**
* Returns the receiver for the next available message. If there
* are no available messages at present the call will block for up
@@ -108,7 +109,7 @@
* Receiver::NoMessageAvailable if no message became available in
* time.
*/
- QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN Receiver nextReceiver(Duration timeout=INFINITE_DURATION);
/**
* Create a new sender through which messages can be sent to the
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Mar 3 17:06:44 2010
@@ -812,6 +812,7 @@
../include/qpid/messaging/Address.h \
../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Codec.h \
+ ../include/qpid/messaging/Duration.h \
../include/qpid/messaging/ListContent.h \
../include/qpid/messaging/ListView.h \
../include/qpid/messaging/MapContent.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Wed Mar 3 17:06:44 2010
@@ -40,28 +40,28 @@
}
}
-qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
return result;
}
-qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
return result;
}
-bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Get f(*this, message, timeout);
while (!parent.execute(f)) {}
return f.result;
}
-bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Fetch f(*this, message, timeout);
while (!parent.execute(f)) {}
@@ -143,12 +143,12 @@
parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
-bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
return parent.get(*this, message, timeout);
}
-bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
if (state == CANCELLED) return false;//TODO: or should this be an error?
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Wed Mar 3 17:06:44 2010
@@ -27,7 +27,7 @@
#include "qpid/messaging/Variant.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
-#include "qpid/sys/Time.h"
+#include "qpid/messaging/Duration.h"
#include <memory>
namespace qpid {
@@ -50,10 +50,10 @@
const qpid::messaging::Address& address);
void init(qpid::client::AsyncSession session, AddressResolution& resolver);
- bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- qpid::messaging::Message get(qpid::sys::Duration timeout);
- bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+ bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message get(qpid::messaging::Duration timeout);
+ bool fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ qpid::messaging::Message fetch(qpid::messaging::Duration timeout);
void close();
void start();
void stop();
@@ -79,8 +79,8 @@
void startFlow();
//implementation of public facing methods
- bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void closeImpl();
void setCapacityImpl(uint32_t);
@@ -96,10 +96,10 @@
struct Get : Command
{
qpid::messaging::Message& message;
- qpid::sys::Duration timeout;
+ qpid::messaging::Duration timeout;
bool result;
- Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) :
Command(i), message(m), timeout(t), result(false) {}
void operator()() { result = impl.getImpl(message, timeout); }
};
@@ -107,10 +107,10 @@
struct Fetch : Command
{
qpid::messaging::Message& message;
- qpid::sys::Duration timeout;
+ qpid::messaging::Duration timeout;
bool result;
- Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) :
Command(i), message(m), timeout(t), result(false) {}
void operator()() { result = impl.fetchImpl(message, timeout); }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Mar 3 17:06:44 2010
@@ -266,24 +266,33 @@
}
}
-bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
+qpid::sys::Duration adjust(qpid::messaging::Duration timeout)
{
- return incoming.get(handler, timeout);
+ if (timeout < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
+ return timeout * qpid::sys::TIME_MSEC;
+ } else {
+ return qpid::sys::TIME_INFINITE;
+ }
+}
+
+bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout)
+{
+ return incoming.get(handler, adjust(timeout));
}
-bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
return getIncoming(handler, timeout);
}
-bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
qpid::sys::Mutex::ScopedLock l(lock);
while (true) {
try {
std::string destination;
- if (incoming.getNextDestination(destination, timeout)) {
+ if (incoming.getNextDestination(destination, adjust(timeout))) {
Receivers::const_iterator i = receivers.find(destination);
if (i == receivers.end()) {
throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
@@ -300,7 +309,7 @@
}
}
-qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout)
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout)
{
qpid::messaging::Receiver receiver;
if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Wed Mar 3 17:06:44 2010
@@ -22,6 +22,7 @@
*
*/
#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Variant.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
@@ -68,12 +69,12 @@
qpid::messaging::Sender getSender(const std::string& name) const;
qpid::messaging::Receiver getReceiver(const std::string& name) const;
- bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout);
- qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout);
+ bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout);
+ qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout);
qpid::messaging::Connection getConnection() const;
- bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
@@ -114,7 +115,7 @@
const bool transactional;
bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
- bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+ bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp Wed Mar 3 17:06:44 2010
@@ -50,8 +50,8 @@
void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
const std::string& Message::getCorrelationId() const { return impl->correlationId; }
-void Message::setTtl(uint64_t ttl) { impl->ttl = ttl; }
-uint64_t Message::getTtl() const { return impl->ttl; }
+void Message::setTtl(Duration ttl) { impl->ttl = ttl; }
+Duration Message::getTtl() const { return impl->ttl; }
void Message::setDurable(bool durable) { impl->durable = durable; }
bool Message::getDurable() const { return impl->durable; }
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Wed Mar 3 17:06:44 2010
@@ -39,10 +39,10 @@
Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); }
Receiver::~Receiver() { PI::dtor(*this); }
Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
-bool Receiver::get(Message& message, qpid::sys::Duration timeout) { return impl->get(message, timeout); }
-Message Receiver::get(qpid::sys::Duration timeout) { return impl->get(timeout); }
-bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return impl->fetch(message, timeout); }
-Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeout); }
+bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
+Message Receiver::get(Duration timeout) { return impl->get(timeout); }
+bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); }
+Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
uint32_t Receiver::available() { return impl->available(); }
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Wed Mar 3 17:06:44 2010
@@ -22,7 +22,6 @@
*
*/
#include "qpid/RefCounted.h"
-#include "qpid/sys/Time.h"
namespace qpid {
namespace client {
@@ -38,10 +37,10 @@
{
public:
virtual ~ReceiverImpl() {}
- virtual bool get(Message& message, qpid::sys::Duration timeout) = 0;
- virtual Message get(qpid::sys::Duration timeout) = 0;
- virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
- virtual Message fetch(qpid::sys::Duration timeout) = 0;
+ virtual bool get(Message& message, Duration timeout) = 0;
+ virtual Message get(Duration timeout) = 0;
+ virtual bool fetch(Message& message, Duration timeout) = 0;
+ virtual Message fetch(Duration timeout) = 0;
virtual void setCapacity(uint32_t) = 0;
virtual uint32_t getCapacity() = 0;
virtual uint32_t available() = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Wed Mar 3 17:06:44 2010
@@ -76,13 +76,13 @@
impl->flush();
}
-bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout)
+bool Session::nextReceiver(Receiver& receiver, Duration timeout)
{
return impl->nextReceiver(receiver, timeout);
}
-Receiver Session::nextReceiver(qpid::sys::Duration timeout)
+Receiver Session::nextReceiver(Duration timeout)
{
return impl->nextReceiver(timeout);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Wed Mar 3 17:06:44 2010
@@ -23,7 +23,7 @@
*/
#include "qpid/RefCounted.h"
#include <string>
-#include "qpid/sys/Time.h"
+#include "qpid/messaging/Duration.h"
namespace qpid {
namespace client {
@@ -50,8 +50,8 @@
virtual void flush() = 0;
virtual Sender createSender(const Address& address) = 0;
virtual Receiver createReceiver(const Address& address) = 0;
- virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0;
- virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0;
+ virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0;
+ virtual Receiver nextReceiver(Duration timeout) = 0;
virtual uint32_t available() = 0;
virtual uint32_t pendingAck() = 0;
virtual Sender getSender(const std::string& name) const = 0;
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Mar 3 17:06:44 2010
@@ -130,7 +130,7 @@
Message out(Uuid(true).str());
s.send(out);
Message in;
- BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
+ BOOST_CHECK(r.fetch(in, 5*DURATION_SEC));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
r.close();
s.close();
@@ -196,7 +196,7 @@
}
};
-std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=DURATION_SEC*5)
{
std::vector<std::string> data;
Message message;
@@ -215,7 +215,7 @@
}
void receive(Receiver& receiver, uint count = 1, uint start = 1,
- const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+ const std::string& base = "Message", Duration timeout=DURATION_SEC*5)
{
for (uint i = start; i < start + count; ++i) {
BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
@@ -229,7 +229,7 @@
Message out("test-message");
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
fix.session.acknowledge();
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
}
@@ -246,7 +246,7 @@
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in;
for (uint i = 0; i < 10; ++i) {
- BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
+ BOOST_CHECK(receiver.fetch(in, 5 * DURATION_SEC));
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
fix.session.acknowledge();
@@ -323,7 +323,7 @@
for (uint i = 0; i < fix.queues.size(); i++) {
Message msg;
- BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC));
+ BOOST_CHECK(fix.session.nextReceiver().fetch(msg, DURATION_SEC));
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
}
}
@@ -339,7 +339,7 @@
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
@@ -358,7 +358,7 @@
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
@@ -378,7 +378,7 @@
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
ListView view(in);
BOOST_CHECK_EQUAL(view.size(), content.size());
BOOST_CHECK_EQUAL(view.front().asString(), "abc");
@@ -412,7 +412,7 @@
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
ListView view(in);
BOOST_CHECK_EQUAL(view.size(), content.size());
BOOST_CHECK_EQUAL(view.front().asString(), "abc");
@@ -441,10 +441,10 @@
Message m2("accept-me");
sender.send(m2);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * DURATION_SEC);
BOOST_CHECK_EQUAL(in.getContent(), m1.getContent());
fix.session.reject(in);
- in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ in = receiver.fetch(5 * DURATION_SEC);
BOOST_CHECK_EQUAL(in.getContent(), m2.getContent());
fix.session.acknowledge();
}
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Wed Mar 3 17:06:44 2010
@@ -27,16 +27,12 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
-#include <qpid/sys/Time.h>
#include "TestOptions.h"
#include <iostream>
using namespace qpid::messaging;
-using qpid::sys::Duration;
-using qpid::sys::TIME_INFINITE;
-using qpid::sys::TIME_SEC;
using namespace std;
@@ -94,8 +90,8 @@
Duration getTimeout()
{
- if (forever) return TIME_INFINITE;
- else return timeout*TIME_SEC;
+ if (forever) return INFINITE_DURATION;
+ else return timeout*DURATION_SEC;
}
bool parse(int argc, char** argv)
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp?rev=918575&r1=918574&r2=918575&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_stream.cpp Wed Mar 3 17:06:44 2010
@@ -32,7 +32,6 @@
#include <string>
using namespace qpid::messaging;
-using namespace qpid::sys;
namespace qpid {
namespace tests {
@@ -58,13 +57,13 @@
const std::string TIMESTAMP = "ts";
-uint64_t timestamp(const AbsTime& time)
+uint64_t timestamp(const qpid::sys::AbsTime& time)
{
- Duration t(time);
+ qpid::sys::Duration t(time);
return t;
}
-struct Client : Runnable
+struct Client : qpid::sys::Runnable
{
virtual ~Client() {}
virtual void doWork(Session&) = 0;
@@ -83,9 +82,9 @@
}
}
- Thread thread;
+ qpid::sys::Thread thread;
- void start() { thread = Thread(this); }
+ void start() { thread = qpid::sys::Thread(this); }
void join() { thread.join(); }
};
@@ -95,20 +94,20 @@
{
Sender sender = session.createSender(opts.address);
Message msg;
- uint64_t interval = TIME_SEC / opts.rate;
+ uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
- AbsTime start = now();
+ qpid::sys::AbsTime start = qpid::sys::now();
while (true) {
- AbsTime sentAt = now();
+ qpid::sys::AbsTime sentAt = qpid::sys::now();
msg.getHeaders()[TIMESTAMP] = timestamp(sentAt);
sender.send(msg);
++sent;
- AbsTime waitTill(start, sent*interval);
- Duration delay(sentAt, waitTill);
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ qpid::sys::Duration delay(sentAt, waitTill);
if (delay < 0) {
++missedRate;
} else {
- qpid::sys::usleep(delay / TIME_USEC);
+ qpid::sys::usleep(delay / qpid::sys::TIME_USEC);
}
}
}
@@ -128,9 +127,9 @@
session.acknowledge();//TODO: add batching option
++received;
//calculate latency
- uint64_t receivedAt = timestamp(now());
+ uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();
- double latency = ((double) (receivedAt - sentAt)) / TIME_MSEC;
+ double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC;
//update avg, min & max
minLatency = std::min(minLatency, latency);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org