You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Rajith Attapattu <ra...@gmail.com> on 2012/01/25 01:31:03 UTC

Fwd: failure notice

The drainDispatchQueue() method has a fair amount of duplicate code
with the syncDispatchQueue() method.
Next step is to combine the common parts to get rid of duplicate code.

Rajith
---------- Forwarded message ----------
From:  <MA...@apache.org>
Date: Tue, Jan 24, 2012 at 6:48 PM
Subject: failure notice
To: rajith77@gmail.com


Hi. This is the qmail-send program at apache.org.
I'm afraid I wasn't able to deliver your message to the following addresses.
This is a permanent error; I've given up. Sorry it didn't work out.

<co...@qpid.apache.org>:
Must be sent from an @apache.org address.

--- Below this line is a copy of the message.

Return-Path: <ra...@gmail.com>
Received: (qmail 63077 invoked by uid 99); 24 Jan 2012 23:48:06 -0000
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
   by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:48:06 +0000
X-ASF-Spam-Status: No, hits=-0.5 required=5.0
       tests=FREEMAIL_ENVFROM_END_DIGIT,RCVD_IN_DNSWL_LOW,SPF_PASS
X-Spam-Check-By: apache.org
Received-SPF: pass (nike.apache.org: domain of rajith77@gmail.com
designates 209.85.214.170 as permitted sender)
Received: from [209.85.214.170] (HELO mail-tul01m020-f170.google.com)
(209.85.214.170)
   by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:47:57 +0000
Received: by obbup3 with SMTP id up3so3359185obb.15
       for <co...@qpid.apache.org>; Tue, 24 Jan 2012 15:47:36 -0800 (PST)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
       d=gmail.com; s=gamma;
       h=mime-version:in-reply-to:references:date:message-id:subject:from:to
        :content-type:content-transfer-encoding;
       bh=szoMtIlzqxU0xVU4u3wYecP/SmwatOSwERUNmFdPMTg=;
       b=KDIxjVicAJNjeUe0nofclYwJquaBDD3vSzvgatx1/C5IpB/bYPSNK5V47KJ54MZsm9
        Cw2gEXTsSmZUJJHr4JFpmQ79qtlWMeXQHfLHYshY+GtcidaC0GOvu7L2IKTF0bq5stI2
        Sw/y1xnzBiGStfzpwJdebxJeCIdX13ugVnGvk=
MIME-Version: 1.0
Received: by 10.182.192.36 with SMTP id hd4mr932729obc.60.1327448856745; Tue,
 24 Jan 2012 15:47:36 -0800 (PST)
Received: by 10.60.28.225 with HTTP; Tue, 24 Jan 2012 15:47:36 -0800 (PST)
In-Reply-To: <20...@eris.apache.org>
References: <20...@eris.apache.org>
Date: Tue, 24 Jan 2012 18:47:36 -0500
Message-ID: <CA...@mail.gmail.com>
Subject: Re: svn commit: r1235550 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
 AMQSession.java AMQSession_0_10.java
From: Rajith Attapattu <ra...@gmail.com>
To: commits@qpid.apache.org
Content-Type: text/plain; charset=ISO-8859-1
Content-Transfer-Encoding: quoted-printable
X-Virus-Checked: Checked by ClamAV on apache.org

The drainDispatchQueue() method has a fair amount of duplicate code
with the syncDispatchQueue() method.
Next step is to combine the common parts to get rid of duplicate code.

Rajith

On Tue, Jan 24, 2012 at 6:26 PM,  <ra...@apache.org> wrote:
> Author: rajith
> Date: Tue Jan 24 23:26:46 2012
> New Revision: 1235550
>
> URL: http://svn.apache.org/viewvc?rev=3D1235550&view=3Drev
> Log:
> QPID-3604 Once message stop is issued for each subscriber, the client
> now drains the internal queues of each subscriber. It also drains the
> dispatch queue. These messages are then released without marking them as
> redelivered. Messages that were given to the application but were not
> acked are also released, but are marked as redelivered. All messages
> received upto that point are marked as completed.
>
> Modified:
> =A0 =A0qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/A=
MQSession.java
> =A0 =A0qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/A=
MQSession_0_10.java
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/clien=
t/AMQSession.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/ja=
va/org/apache/qpid/client/AMQSession.java?rev=3D1235550&r1=3D1235549&r2=3D1=
235550&view=3Ddiff
> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS=
ession.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS=
ession.java Tue Jan 24 23:26:46 2012
> @@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
> =A0 =A0 =A0* Set when the dispatcher should direct incoming messages stra=
ight into the UnackedMessage list instead of
> =A0 =A0 =A0* to the syncRecieveQueue or MessageListener. Used during clea=
nup, e.g. in Session.recover().
> =A0 =A0 =A0*/
> - =A0 =A0private volatile boolean _usingDispatcherForCleanup;
> + =A0 =A0protected volatile boolean _usingDispatcherForCleanup;
>
> =A0 =A0 /** Used to indicates that the connection to which this session b=
elongs, has been stopped. */
> =A0 =A0 private boolean _connectionStopped;
> @@ -2247,6 +2247,58 @@ public abstract class AMQSession<C exten
> =A0 =A0 =A0 =A0 }
> =A0 =A0 }
>
> + =A0 =A0void drainDispatchQueue()
> + =A0 =A0{
> + =A0 =A0 =A0 =A0if (Thread.currentThread() =3D=3D _dispatcherThread)
> + =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0while (!_closed.get() && !_queue.isEmpty())
> + =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Dispatchable disp;
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0disp =3D (Dispatchable) _queue.t=
ake();
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0catch (InterruptedException e)
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0throw new RuntimeException(e);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}
> +
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0// Check just in case _queue becomes emp=
ty, it shouldn't but
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0// better than an NPE.
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (disp =3D=3D null)
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0_logger.debug("_queue became emp=
ty during sync.");
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0break;
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}
> +
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0disp.dispatch(AMQSession.this);
> + =A0 =A0 =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0else
> + =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0startDispatcherIfNecessary(false);
> +
> + =A0 =A0 =A0 =A0 =A0 =A0final CountDownLatch signal =3D new CountDownLat=
ch(1);
> +
> + =A0 =A0 =A0 =A0 =A0 =A0_queue.add(new Dispatchable()
> + =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void dispatch(AMQSession ssn)
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0signal.countDown();
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0 =A0 =A0});
> +
> + =A0 =A0 =A0 =A0 =A0 =A0try
> + =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0signal.await();
> + =A0 =A0 =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0 =A0 =A0catch (InterruptedException e)
> + =A0 =A0 =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0throw new RuntimeException(e);
> + =A0 =A0 =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0}
> + =A0 =A0}
> +
> =A0 =A0 /**
> =A0 =A0 =A0* Resubscribes all producers and consumers. This is called whe=
n performing failover.
> =A0 =A0 =A0*
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/clien=
t/AMQSession_0_10.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/ja=
va/org/apache/qpid/client/AMQSession_0_10.java?rev=3D1235550&r1=3D1235549&r=
2=3D1235550&view=3Ddiff
> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS=
ession_0_10.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS=
ession_0_10.java Tue Jan 24 23:26:46 2012
> @@ -1354,5 +1354,45 @@ public class AMQSession_0_10 extends AMQ
> =A0 =A0 =A0 =A0 super.resubscribe();
> =A0 =A0 =A0 =A0 getQpidSession().sync();
> =A0 =A0 }
> +
> + =A0 =A0@Override
> + =A0 =A0void stop() throws AMQException
> + =A0 =A0{
> + =A0 =A0 =A0 =A0super.stop();
> + =A0 =A0 =A0 =A0synchronized (getMessageDeliveryLock())
> + =A0 =A0 =A0 =A0{
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (BasicMessageConsumer consumer : _consu=
mers.values())
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 {
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 List<Long> tags =3D consumer.drainR=
eceiverQueueAndRetrieveDeliveryTags();
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 _prefetchedMessageTags.addAll(tags)=
;
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 }
> + =A0 =A0 =A0 =A0}
> + =A0 =A0 =A0 =A0_usingDispatcherForCleanup =3D true;
> + =A0 =A0 =A0 =A0drainDispatchQueue();
> + =A0 =A0 =A0 =A0_usingDispatcherForCleanup =3D false;
> +
> + =A0 =A0 =A0 =A0RangeSet delivered =3D gatherRangeSet(_unacknowledgedMes=
sageTags);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 RangeSet prefetched =3D gatherRangeSet(_pre=
fetchedMessageTags);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 RangeSet all =3D RangeSetFactory.createRang=
eSet(delivered.size()
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0=
 =A0 + prefetched.size());
> +
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (Iterator<Range> deliveredIter =3D deli=
vered.iterator(); deliveredIter.hasNext();)
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 {
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 Range range =3D deliveredIt=
er.next();
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 all.add(range);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 }
> +
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (Iterator<Range> prefetchedIter =3D pre=
fetched.iterator(); prefetchedIter.hasNext();)
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 {
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 Range range =3D prefetchedI=
ter.next();
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 all.add(range);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 }
> +
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 flushProcessed(all, false);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 getQpidSession().messageRelease(delivered,O=
ption.SET_REDELIVERED);
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 getQpidSession().messageRelease(prefetched)=
;
> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 sync();
> + =A0 =A0}
> +
> =A0}
>
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: =A0 =A0 =A0http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org