You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/27 17:49:55 UTC
svn commit: r551207 [1/2] - in /incubator/qpid/trunk/qpid: ./ java/
java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/exchange/
java/broker/src/main/java/org/apache/qpid/server/protocol/...
Author: ritchiem
Date: Wed Jun 27 08:49:51 2007
New Revision: 551207
URL: http://svn.apache.org/viewvc?view=rev&rev=551207
Log:
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line
Added xml file for logging during sustained tests
........
r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line
Immediate and mandatory flag tests added.
........
r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line
QPID-509 Mandatory messages not returned outside a transaction. They are now.
........
r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines
Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki.
http://cwiki.apache.org/qpid/sustained-tests.html
........
r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line
Added intelij files to ignore list
........
r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line
POM update to add Apache content to built jars
........
r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line
Updated default guest password as it was not correct.
........
r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line
Added additional information to log message when available to aid the explination of a failed connection
........
Added:
incubator/qpid/trunk/qpid/java/integrationtests/src/resources/sustained-log4j.xml
- copied unchanged from r551121, incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/broker/etc/md5passwd
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
incubator/qpid/trunk/qpid/java/management/eclipse-plugin/ (props changed)
incubator/qpid/trunk/qpid/java/pom.xml
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/java/broker/etc/md5passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/md5passwd?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/md5passwd (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/md5passwd Wed Jun 27 08:49:51 2007
@@ -16,6 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-guest:qfgyy4ewnVMBg
+guest:CE4DQ6BIb/BVMN9scFyLtA==
admin:ISMvKXpXpadDiUoOSoAfww==
user:aBzonUodYLhwSa8s9A10sA==
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Jun 27 08:49:51 2007
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.server;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -52,6 +43,16 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -208,7 +209,8 @@
_currentMessage.setPublisher(publisher);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -230,6 +232,7 @@
// check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
+ _txnContext.messageProcessed(protocolSession);
_currentMessage = null;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Wed Jun 27 08:49:51 2007
@@ -169,7 +169,7 @@
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Wed Jun 27 08:49:51 2007
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,24 +46,21 @@
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public class DestWildExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
- // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
private static final String AMQP_HASH = "#";
@@ -90,7 +92,7 @@
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
+ Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -118,7 +120,6 @@
} // End of MBean class
-
public AMQShortString getType()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -140,6 +141,7 @@
{
queueList = _routingKey2queues.get(routingKey);
}
+
if (!queueList.contains(queue))
{
queueList.add(queue);
@@ -165,8 +167,8 @@
for (int index = 0; index < size; index++)
{
- //if there are more levels
- if (index + 1 < size)
+ // if there are more levels
+ if ((index + 1) < size)
{
if (_subscription.get(index).equals(AMQP_HASH))
{
@@ -175,7 +177,7 @@
// we don't need #.# delete this one
_subscription.remove(index);
size--;
- //redo this normalisation
+ // redo this normalisation
index--;
}
@@ -186,7 +188,7 @@
_subscription.add(index + 1, _subscription.remove(index));
}
}
- }//if we have more levels
+ } // if we have more levels
}
StringBuilder sb = new StringBuilder();
@@ -211,9 +213,9 @@
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null || queues.size() == 0)
+ if ((queues == null) || queues.isEmpty())
{
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
String msg = "Topic " + routingKey + " is not known to " + this;
throw new NoRouteException(msg, payload, null);
@@ -222,6 +224,7 @@
{
_logger.warn("No queues found for routing key " + routingKey);
_logger.warn("Routing map contains: " + _routingKey2queues);
+
return;
}
}
@@ -238,14 +241,15 @@
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && queues.contains(queue);
- }
+ return (queues != null) && queues.contains(queue);
+ }
public boolean isBound(AMQShortString routingKey) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && !queues.isEmpty();
+
+ return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
@@ -257,6 +261,7 @@
return true;
}
}
+
return false;
}
@@ -279,12 +284,14 @@
" with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
+
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey, null);
}
+
if (queues.isEmpty())
{
_routingKey2queues.remove(routingKey);
@@ -304,7 +311,6 @@
}
}
-
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -334,7 +340,6 @@
queueList.add(queTok.nextToken());
}
-
int depth = 0;
boolean matching = true;
boolean done = false;
@@ -343,25 +348,26 @@
while (matching && !done)
{
- if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
{
done = true;
// if it was the routing key that ran out of digits
- if (routingkeyList.size() == depth + routingskip)
+ if (routingkeyList.size() == (depth + routingskip))
{
if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ { // a hash and it is the last entry
+ matching =
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
- else if (routingkeyList.size() > depth + routingskip)
+ else if (routingkeyList.size() > (depth + routingskip))
{
// There is still more routing key to check
matching = false;
}
-
continue;
}
@@ -377,27 +383,33 @@
else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
{
// Is this a # at the end
- if (queueList.size() == depth + queueskip + 1)
+ if (queueList.size() == (depth + queueskip + 1))
{
done = true;
+
continue;
}
// otherwise # in the middle
- while (routingkeyList.size() > depth + routingskip)
+ while (routingkeyList.size() > (depth + routingskip))
{
if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
{
queueskip++;
depth++;
+
break;
}
+
routingskip++;
}
+
continue;
}
+
matching = false;
}
+
depth++;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Jun 27 08:49:51 2007
@@ -1,27 +1,36 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@
private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@
{
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@
}
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@
}
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,7 +144,6 @@
{
assert queue != null;
-
if (!_queues.remove(queue))
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@
{
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
@@ -193,12 +189,11 @@
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Jun 27 08:49:51 2007
@@ -229,7 +229,7 @@
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getMessagePublishInfo().isMandatory())
+ if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Jun 27 08:49:51 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,13 @@
*/
package org.apache.qpid.server.protocol;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.security.Principal;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,22 +35,34 @@
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class AMQMinaProtocolSession implements AMQProtocolSession,
- Managable
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,25 +112,20 @@
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
-
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
-
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -140,16 +136,15 @@
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
-// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -182,8 +177,7 @@
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- public void dataBlockReceived(AMQDataBlock message)
- throws Exception
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -203,8 +197,7 @@
}
}
- private void frameReceived(AMQFrame frame)
- throws AMQException
+ private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -252,13 +245,13 @@
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ AMQFrame response =
+ ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -269,21 +262,19 @@
// TODO: Close connection (but how to wait until message is sent?)
// ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-// future.join();
-// close connection
+ // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+ // future.join();
+ // close connection
}
}
-
private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
- methodBody);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- //Check that this channel is not closing
+ // Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
@@ -299,11 +290,11 @@
{
_logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
+
return;
}
}
-
try
{
try
@@ -315,10 +306,10 @@
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
+
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt, null);
@@ -332,6 +323,7 @@
{
_logger.info("Closing channel due to: " + e.getMessage());
}
+
writeFrame(e.getCloseFrame(channelId));
closeChannel(channelId);
}
@@ -341,14 +333,17 @@
{
_logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
}
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
- AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce =
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(ce.getCloseFrame(channelId));
@@ -360,6 +355,7 @@
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
@@ -372,17 +368,17 @@
{
listener.error(e);
}
+
_minaProtocolSession.close();
}
}
-
private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
- channel.publishContentHeader(body);
+ channel.publishContentHeader(body, this);
}
@@ -427,15 +423,15 @@
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
}
+
return channel;
}
public AMQChannel getChannel(int channelId) throws AMQException
{
- final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- ? _cachedChannels[channelId]
- : _channelMap.get(channelId);
- if (channel == null || channel.isClosing())
+ final AMQChannel channel =
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ if ((channel == null) || channel.isClosing())
{
return null;
}
@@ -466,8 +462,9 @@
if (_channelMap.size() == _maxNoOfChannels)
{
- String errorMessage = toString() + ": maximum number of channels has been reached (" +
- _maxNoOfChannels + "); can't create channel";
+ String errorMessage =
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
}
@@ -480,6 +477,7 @@
{
_cachedChannels[channelId] = channel;
}
+
checkForNotification();
}
@@ -504,7 +502,7 @@
public void commitTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.commit();
}
@@ -512,7 +510,7 @@
public void rollbackTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.rollback();
}
@@ -597,6 +595,7 @@
{
channel.close(this);
}
+
_channelMap.clear();
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
@@ -615,6 +614,7 @@
{
_managedObject.unregister();
}
+
for (Task task : _taskList)
{
task.doTask(this);
@@ -687,6 +687,7 @@
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
+
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
{
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
@@ -715,7 +716,7 @@
public boolean isProtocolVersion(byte major, byte minor)
{
- return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
+ return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
}
public VersionSpecificRegistry getRegistry()
@@ -723,13 +724,11 @@
return _registry;
}
-
public Object getClientIdentifier()
{
return _minaProtocolSession.getRemoteAddress();
}
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -769,6 +768,6 @@
public String getClientVersion()
{
- return _clientVersion == null ? null : _clientVersion.toString();
+ return (_clientVersion == null) ? null : _clientVersion.toString();
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Jun 27 08:49:51 2007
@@ -49,6 +49,16 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.JMException;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -607,7 +617,7 @@
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -623,7 +633,7 @@
// from the queue:
dequeue(storeContext, msg);
}
- }
+ }*/
// public DeliveryManager getDeliveryManager()
// {
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Jun 27 08:49:51 2007
@@ -337,7 +337,14 @@
if ((message == null) || message.equals(""))
{
- message = "Unable to Connect";
+ if (message == null)
+ {
+ message = "Unable to Connect";
+ }
+ else // can only be "" if getMessage() returned it therfore lastException != null
+ {
+ message = "Unable to Connect:" + lastException.getClass();
+ }
}
AMQException e = new AMQConnectionFailureException(message, null);
Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java Wed Jun 27 08:49:51 2007
@@ -252,7 +252,8 @@
public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java Wed Jun 27 08:49:51 2007
@@ -37,6 +37,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -91,7 +92,9 @@
private static final long TEN_MILLI_SEC = 10000000;
- private static final long FIVE_MILLI_SEC = 5000000;
+ private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+ private static final int LOG_UPATE_INTERVAL = 10;
+ private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -129,6 +132,7 @@
String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+ String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
if (debugLog.isDebugEnabled())
{
@@ -150,7 +154,9 @@
session = new Session[1];
connection[0] =
- org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
+ org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, ackMode);
@@ -182,6 +188,7 @@
{
connection[i] =
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, ackMode);
@@ -192,7 +199,7 @@
MessageConsumer consumer = session[i].createConsumer(sendDestination);
- consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
+ consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination));
}
break;
@@ -347,7 +354,7 @@
_received++;
if (((TextMessage) message).getText().equals("start"))
{
- debugLog.info("Starting Batch");
+ debugLog.debug("Starting Batch");
_startTime = System.nanoTime();
}
else if (((TextMessage) message).getText().equals("end"))
@@ -355,8 +362,8 @@
if (_startTime != null)
{
long currentTime = System.nanoTime();
- sendStatus(currentTime - _startTime, _received);
- debugLog.info("End Batch");
+ sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+ debugLog.debug("End Batch");
}
}
}
@@ -373,28 +380,31 @@
*
* @param time taken for the the last batch
* @param received Total number of messages received.
- *
+ * @param batchNumber the batch number
* @throws JMSException if an error occurs during the send
*/
- private void sendStatus(long time, long received) throws JMSException
+ private void sendStatus(long time, long received, int batchNumber) throws JMSException
{
Message updateMessage = _session.createTextMessage("update");
updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
updateMessage.setLongProperty("RECEIVED", received);
+ updateMessage.setIntProperty("BATCH", batchNumber);
updateMessage.setLongProperty("DURATION", time);
if (debugLog.isInfoEnabled())
{
- debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ debugLog.info("**** SENDING [" + batchNumber + "]**** "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
// Output on the main log.info the details of this batch
- if (received / _batchSize % 10 == 0)
+ if (batchNumber % 10 == 0)
{
- log.info("Sending Report [" + received / _batchSize + "] "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ log.info("Sending Report [" + batchNumber + "] "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
_updater.send(updateMessage);
@@ -415,7 +425,7 @@
class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _messageVariance = 500; //no. messages to allow drifting
+ private long _batchVariance = 3; //no. batches to allow drifting
private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
@@ -451,18 +461,23 @@
long duration = message.getLongProperty("DURATION");
long totalReceived = message.getLongProperty("RECEIVED");
String client = message.getStringProperty("CLIENT_ID");
+ int batchNumber = message.getIntProperty("BATCH");
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+ debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived
+ + " Recevied BATCH:" + batchNumber + " DURATION:" + duration);
}
- recordSlow(client, totalReceived);
-
- adjustDelay(client, totalReceived, duration);
+ recordSlow(client, totalReceived, batchNumber);
+ adjustDelay(client, batchNumber, duration);
- if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+ // Warm up completes when:
+ // we haven't warmed up
+ // and the number of batches sent to each client is at least half of the required warmup batches
+ if (!_warmedup
+ && (batchNumber >= _warmUpBatches))
{
_warmedup = true;
_warmup.countDown();
@@ -478,7 +493,7 @@
CountDownLatch _warmup = new CountDownLatch(1);
- int _warmUpBatches = 20;
+ int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
int _numBatches = 10000;
@@ -527,12 +542,14 @@
testMessage = _client.session[0].createTextMessage("start");
- for (int batch = 0; batch < batchSize; batch++)
+ for (int batch = 0; batch <= batchSize; batch++)
// while (_running)
{
long start = System.nanoTime();
testMessage.setText("start");
+ testMessage.setIntProperty("BATCH", batch);
+
_client.producer.send(testMessage);
_rateAdapter.sentMessage();
@@ -552,9 +569,12 @@
long sendtime = end - start;
- debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ if (debugLog.isDebugEnabled())
+ {
+ debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ }
- if (batch % 10 == 0)
+ if (batch % LOG_UPATE_INTERVAL == 0)
{
log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
}
@@ -583,23 +603,17 @@
return;
}
- //Slow down if gap between send and received is too large
- if (_sent - _totalReceived / delays.size() > _messageVariance)
- {
- //pause between batches.
- debugLog.info("Sleeping to keep sent in check with received");
- log.debug("Increaseing _delay as sending more than receiving");
- _delay += TEN_MILLI_SEC;
- }
-
- //per batch sleep.. if sleep is to small to spread over the batch.
- if (_delay <= TEN_MILLI_SEC * _batchSize)
- {
- sleepLong(_delay);
- }
- else
+ if (!SLEEP_PER_MESSAGE)
{
- debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ //per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= TEN_MILLI_SEC * _batchSize)
+ {
+ sleepLong(_delay);
+ }
+ else
+ {
+ debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ }
}
}
@@ -617,10 +631,10 @@
* Adjust the delay for sending messages based on this update from the client
*
* @param client The client that send this update
- * @param totalReceived The number of messages that this client has received.
* @param duration The time taken for the last batch of messagse
+ * @param batchNumber The reported batchnumber from the client
*/
- private void adjustDelay(String client, long totalReceived, long duration)
+ private void adjustDelay(String client, int batchNumber, long duration)
{
//Retrieve the current total time taken for this client.
Long currentTime = delays.get(client);
@@ -637,23 +651,28 @@
delays.put(client, currentTime);
+ long batchesSent = _sent / _batchSize;
+
+ // ensure we don't divide by zero
+ if (batchesSent == 0)
+ {
+ batchesSent = 1L;
+ }
_totalReceived += _batchSize;
_totalDuration += duration;
- // Calculate the number of messages in the batch.
- long batchCount = (_totalReceived / _batchSize);
-
//calculate average duration accross clients per batch
- long averageDuration = _totalDuration / delays.size() / batchCount;
+ long averageDuration = _totalDuration / delays.size() / batchesSent;
//calculate the difference between current send delay and average report delay
long diff = (duration) - averageDuration;
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
- + " on batch: " + batchCount
+ debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers."
+ + " on batch: " + batchesSent
+ + " received batch: " + batchNumber
+ " Batch Duration: " + duration
+ " Average: " + averageDuration
+ " so diff: " + diff + " for : " + client
@@ -696,6 +715,16 @@
delayStable();
}
+ // If we have a consumer that is behind with the batches.
+ if (batchesSent - batchNumber > _batchVariance)
+ {
+ debugLog.debug("Increasing _delay as sending more than receiving");
+
+ _delay += 2 * TEN_MILLI_SEC;
+ delayChanged();
+ }
+
+
}
/** Reset the number of iterations before we say the delay has stabilised. */
@@ -725,10 +754,11 @@
*
* @param client The client identifier to check
* @param received the number of messages received by that client
+ * @param batchNumber
*/
- private void recordSlow(String client, long received)
+ private void recordSlow(String client, long received, int batchNumber)
{
- if (received < (_sent - _messageVariance))
+ if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
{
_slowClients.put(client, received);
}
@@ -761,6 +791,13 @@
}
}
}
+ else
+ {
+ if (SLEEP_PER_MESSAGE && (_delay > 0))
+ {
+ sleepLong(_delay / _batchSize);
+ }
+ }
}
@@ -771,16 +808,38 @@
*/
private boolean checkForSlowClients()
{
- if (_sent % _batchSize == 0)
+ // This will allways be true as we are running this at the end of each batchSize
+// if (_sent % _batchSize == 0)
{
// Cause test to pause when we have slow
if (!_slowClients.isEmpty() || NO_CLIENTS)
{
- debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
+
while (!_slowClients.isEmpty())
{
- debugLog.info(_slowClients.size() + " slow clients.");
+ if (debugLog.isInfoEnabled()
+ && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0)
+ {
+ String clients = "";
+ Iterator it = _slowClients.keySet().iterator();
+ while (it.hasNext())
+ {
+ clients += it.next();
+ if (it.hasNext())
+ {
+ clients += ", ";
+ }
+ }
+ debugLog.info("Pausing for slow clients:" + clients);
+ }
+
+
+ if (log.isDebugEnabled()
+ && _sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.debug(_slowClients.size() + " slow clients.");
+ }
sleep(PAUSE_SLEEP);
}
@@ -794,7 +853,11 @@
}
else
{
- debugLog.info("Delay:" + _delay);
+ if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.info("Total Delay :" + _delay + " "
+ + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")"));
+ }
}
}
@@ -825,7 +888,7 @@
* Perform the sleep , swallowing any InteruptException.
*
* NOTE: If a sleep request is > 10s then reset only sleep for 5s
- *
+ *
* @param milli to sleep for
* @param nano sub miliseconds to sleep for
*/
Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java Wed Jun 27 08:49:51 2007
@@ -113,6 +113,7 @@
setPropertiesOnMessage(assignSender, testProperties);
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
+ assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
senderConversation.send(senderControlTopic, assignSender);
@@ -170,6 +171,7 @@
setPropertiesOnMessage(assignReceiver, _testProperties);
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
+ assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName);
receiverConversation.send(receiverControlTopic, assignReceiver);
Propchange: incubator/qpid/trunk/qpid/java/management/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Jun 27 08:49:51 2007
@@ -1,2 +1,4 @@
target
*.iml
+org.apache.qpid.management.ui.ipr
+org.apache.qpid.management.ui.iws
Modified: incubator/qpid/trunk/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/pom.xml Wed Jun 27 08:49:51 2007
@@ -391,6 +391,29 @@
<version>0.5</version>
</plugin>
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <version>1.0-alpha-5</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ <configuration>
+ <resourceBundles>
+ <resourceBundle>org.apache:apache-incubator-disclaimer-resource-bundle:1.1</resourceBundle>
+ <resourceBundle>org.apache:apache-jar-resource-bundle:1.2</resourceBundle>
+ </resourceBundles>
+ <properties>
+ <addLicense>true</addLicense>
+ <projectName>Apache Qpid</projectName>
+ </properties>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
</plugins>
</pluginManagement>