You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/13 22:15:42 UTC
svn commit: r393912 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/ft/
main/java/org/apache/activemq/broker/region/ main/java/org/apache/active...
Author: rajdavies
Date: Thu Apr 13 13:15:35 2006
New Revision: 393912
URL: http://svn.apache.org/viewcvs?rev=393912&view=rev
Log:
fine tuning, client control commands etc.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 13 13:15:35 2006
@@ -51,9 +51,11 @@
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
@@ -74,6 +76,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
@@ -163,6 +166,7 @@
protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
throws Exception {
this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
+ this.info.setManageable(true);
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport = transport;
@@ -1206,8 +1210,7 @@
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
- advisoryConsumer = new AdvisoryConsumer(this, consumerId);
-
+ advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
/**
@@ -1407,12 +1410,17 @@
} else if ( command.isBrokerInfo() ) {
this.brokerInfo = (BrokerInfo)command;
brokerInfoReceived.countDown();
+ this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
}
else if (command instanceof ControlCommand) {
onControlCommand((ControlCommand) command);
}
else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
onAsyncException(((ConnectionError)command).getException());
+ }else if (command instanceof ConnectionControl){
+ onConnectionControl((ConnectionControl) command);
+ }else if (command instanceof ConsumerControl){
+ onConsumerControl((ConsumerControl) command);
}
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@@ -1451,6 +1459,10 @@
}
public void transportInterupted() {
+ for (Iterator i = this.sessions.iterator(); i.hasNext();) {
+ ActiveMQSession s = (ActiveMQSession) i.next();
+ s.clearMessagesInProgress();
+ }
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener) iter.next();
listener.transportInterupted();
@@ -1462,6 +1474,10 @@
TransportListener listener = (TransportListener) iter.next();
listener.transportResumed();
}
+ for (Iterator i = this.sessions.iterator(); i.hasNext();) {
+ ActiveMQSession s = (ActiveMQSession) i.next();
+ s.deliverAcks();
+ }
}
@@ -1713,6 +1729,30 @@
if (text.equals("shutdown")) {
log.info("JVM told to shutdown");
System.exit(0);
+ }
+ }
+ }
+
+ protected void onConnectionControl(ConnectionControl command){
+ if (command.isFaultTolerant()){
+ this.optimizeAcknowledge = false;
+ for(Iterator i=this.sessions.iterator();i.hasNext();){
+ ActiveMQSession s=(ActiveMQSession) i.next();
+ s.setOptimizeAcknowledge(false);
+ }
+ }
+ }
+
+ protected void onConsumerControl(ConsumerControl command){
+ if(command.isClose()){
+ for(Iterator i=this.sessions.iterator();i.hasNext();){
+ ActiveMQSession s=(ActiveMQSession) i.next();
+ s.close(command.getConsumerId());
+ }
+ }else{
+ for(Iterator i=this.sessions.iterator();i.hasNext();){
+ ActiveMQSession s=(ActiveMQSession) i.next();
+ s.setPrefetchSize(command.getConsumerId(),command.getPrefetch());
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr 13 13:15:35 2006
@@ -111,8 +111,8 @@
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
- private boolean optimizeAcknowledge;
-
+ private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
+
/**
* Create a MessageConsumer
*
@@ -182,6 +182,9 @@
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
+ this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
+ &&!info.isBrowser());
+ this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
@@ -189,8 +192,7 @@
this.session.removeConsumer(this);
throw e;
}
- this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
- &&!info.isDurable()&&!info.getDestination().isQueue();
+
if(session.connection.isStarted())
start();
}
@@ -509,15 +511,34 @@
}
}
- public void clearMessagesInProgress(){
+ void clearMessagesInProgress(){
unconsumedMessages.clear();
}
+
+ void deliverAcks(){
+ synchronized(optimizeAcknowledge){
+ if(this.optimizeAcknowledge.get()){
+ if(!deliveredMessages.isEmpty()){
+ MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
+ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ try{
+ session.asyncSendPacket(ack);
+ }catch(JMSException e){
+ log.error("Failed to delivered acknowledgements",e);
+ }
+ deliveredMessages.clear();
+ ackCounter=0;
+ }
+ }
+ }
+ }
public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) {
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
+ deliverAcks();//only processes optimized acknowledgements
if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
acknowledge();
}
@@ -539,6 +560,18 @@
protected void checkMessageListener() throws JMSException {
session.checkMessageListener();
}
+
+ protected void setOptimizeAcknowledge(boolean value){
+ synchronized(optimizeAcknowledge){
+ deliverAcks();
+ optimizeAcknowledge.set(value);
+ }
+ }
+
+ protected void setPrefetchSize(int prefetch){
+ deliverAcks();
+ this.info.setPrefetchSize(prefetch);
+ }
private void beforeMessageIsConsumed(MessageDispatch md) {
md.setDeliverySequenceId(session.getNextDeliveryId());
@@ -557,18 +590,20 @@
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else if(session.isAutoAcknowledge()){
if(!deliveredMessages.isEmpty()){
- if(this.optimizeAcknowledge){
- ackCounter++;
- if(ackCounter>=(info.getPrefetchSize()*.75)){
- MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+ synchronized(optimizeAcknowledge){
+ if(this.optimizeAcknowledge.get()){
+ ackCounter++;
+ if(ackCounter>=(info.getPrefetchSize()*.75)){
+ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+ session.asyncSendPacket(ack);
+ ackCounter=0;
+ deliveredMessages.clear();
+ }
+ }else{
+ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
- ackCounter=0;
deliveredMessages.clear();
}
- }else{
- MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- session.asyncSendPacket(ack);
- deliveredMessages.clear();
}
}
}else if(session.isDupsOkAcknowledge()){
@@ -662,11 +697,12 @@
public void rollback() throws JMSException{
synchronized(unconsumedMessages.getMutex()){
- if(optimizeAcknowledge){
-
- // remove messages read but not acked at the broker yet through optimizeAcknowledge
- for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
- deliveredMessages.removeLast();
+ synchronized(optimizeAcknowledge){
+ if(optimizeAcknowledge.get()){
+ // remove messages read but not acked at the broker yet through optimizeAcknowledge
+ for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+ deliveredMessages.removeLast();
+ }
}
}
if(deliveredMessages.isEmpty())
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 13 13:15:35 2006
@@ -517,6 +517,21 @@
connection.asyncSendPacket(info.createRemoveCommand());
}
}
+
+ void clearMessagesInProgress(){
+ executor.clearMessagesInProgress();
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
+ consumer.clearMessagesInProgress();
+ }
+ }
+
+ void deliverAcks(){
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
+ consumer.deliverAcks();
+ }
+ }
synchronized public void dispose() throws JMSException {
if (!closed) {
@@ -1704,6 +1719,38 @@
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
if( consumer.getMessageListener()!=null ) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+ }
+ }
+ }
+
+ protected void setOptimizeAcknowledge(boolean value){
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ ActiveMQMessageConsumer c = (ActiveMQMessageConsumer) iter.next();
+ c.setOptimizeAcknowledge(value);
+ }
+ }
+
+ protected void setPrefetchSize(ConsumerId id,int prefetch){
+ for(Iterator iter=consumers.iterator();iter.hasNext();){
+ ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
+ if(c.getConsumerId().equals(id)){
+ c.setPrefetchSize(prefetch);
+ break;
+ }
+ }
+ }
+
+ protected void close(ConsumerId id){
+ for(Iterator iter=consumers.iterator();iter.hasNext();){
+ ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
+ if(c.getConsumerId().equals(id)){
+ try{
+ c.close();
+ }catch(JMSException e){
+ log.warn("Exception closing consumer",e);
+ }
+ log.warn("Closed consumer on Command");
+ break;
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu Apr 13 13:15:35 2006
@@ -234,4 +234,8 @@
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
+ /**
+ * @return true if fault tolerant
+ */
+ public boolean isFaultTolerantConfiguration();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu Apr 13 13:15:35 2006
@@ -205,4 +205,9 @@
}
+
+ public boolean isFaultTolerantConfiguration(){
+ return next.isFaultTolerantConfiguration();
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Thu Apr 13 13:15:35 2006
@@ -90,5 +90,10 @@
* Returns the statistics for this connection
*/
public ConnectionStatistics getStatistics();
+
+ /**
+ * @return true if the Connection will process control commands
+ */
+ public boolean isManageable();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu Apr 13 13:15:35 2006
@@ -198,6 +198,10 @@
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
}
+
+ public boolean isFaultTolerantConfiguration(){
+ return false;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu Apr 13 13:15:35 2006
@@ -200,6 +200,10 @@
throw new IllegalStateException(this.message);
}
+
+ public boolean isFaultTolerantConfiguration(){
+ throw new IllegalStateException(this.message);
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu Apr 13 13:15:35 2006
@@ -215,4 +215,8 @@
}
+ public boolean isFaultTolerantConfiguration(){
+ return getNext().isFaultTolerantConfiguration();
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Thu Apr 13 13:15:35 2006
@@ -18,11 +18,13 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
@@ -67,11 +69,26 @@
*/
public void startProcessing(){
started.set(true);
+ try{
+ Connection[] connections=getClients();
+ ConnectionControl command=new ConnectionControl();
+ command.setFaultTolerant(true);
+ if(connections!=null){
+ for(int i=0;i<connections.length;i++){
+ if(connections[i].isActive()&&connections[i].isManageable()){
+ connections[i].dispatchAsync(command);
+ }
+ }
+ }
+ }catch(Exception e){
+ log.error("Failed to get Connections",e);
+ }
}
/**
* stop the broker
- * @throws Exception
+ *
+ * @throws Exception
*/
public void stop() throws Exception{
super.stop();
@@ -301,6 +318,10 @@
sendToSlave(ack);
super.acknowledge(context, ack);
+ }
+
+ public boolean isFaultTolerantConfiguration(){
+ return true;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Thu Apr 13 13:15:35 2006
@@ -131,6 +131,9 @@
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
localBroker.oneway(connectionInfo);
+ ConnectionInfo remoteInfo=new ConnectionInfo();
+ connectionInfo.copy(remoteInfo);
+ remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo,1);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Apr 13 13:15:35 2006
@@ -470,6 +470,10 @@
public Set getDurableDestinations(){
return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
}
+
+ public boolean isFaultTolerantConfiguration(){
+ return false;
+ }
protected void doStop(ServiceStopper ss) {
@@ -486,6 +490,8 @@
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Thu Apr 13 13:15:35 2006
@@ -1,105 +1,132 @@
/**
- *
+ *
* Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
-
-
/**
- * When a client connects to a broker, the broker send the client a BrokerInfo
- * so that the client knows which broker node he's talking to and also any peers
- * that the node has in his cluster. This is the broker helping the client out
+ * When a client connects to a broker, the broker send the client a BrokerInfo so that the client knows which broker
+ * node he's talking to and also any peers that the node has in his cluster. This is the broker helping the client out
* in discovering other nodes in the cluster.
*
* @openwire:marshaller code="2"
* @version $Revision: 1.7 $
*/
-public class BrokerInfo extends BaseCommand {
-
+public class BrokerInfo extends BaseCommand{
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.BROKER_INFO;
BrokerId brokerId;
String brokerURL;
boolean slaveBroker;
-
+ boolean masterBroker;
+ boolean faultTolerantConfiguration;
BrokerInfo peerBrokerInfos[];
String brokerName;
-
- public boolean isBrokerInfo() {
+
+ public boolean isBrokerInfo(){
return true;
}
- public byte getDataStructureType() {
+ public byte getDataStructureType(){
return DATA_STRUCTURE_TYPE;
}
-
+
/**
* @openwire:property version=1 cache=true
*/
- public BrokerId getBrokerId() {
+ public BrokerId getBrokerId(){
return brokerId;
}
- public void setBrokerId(BrokerId brokerId) {
- this.brokerId = brokerId;
+
+ public void setBrokerId(BrokerId brokerId){
+ this.brokerId=brokerId;
}
/**
* @openwire:property version=1
*/
- public String getBrokerURL() {
+ public String getBrokerURL(){
return brokerURL;
}
- public void setBrokerURL(String brokerURL) {
- this.brokerURL = brokerURL;
+
+ public void setBrokerURL(String brokerURL){
+ this.brokerURL=brokerURL;
}
/**
* @openwire:property version=1 testSize=0
*/
- public BrokerInfo[] getPeerBrokerInfos() {
+ public BrokerInfo[] getPeerBrokerInfos(){
return peerBrokerInfos;
}
- public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
- this.peerBrokerInfos = peerBrokerInfos;
+
+ public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos){
+ this.peerBrokerInfos=peerBrokerInfos;
}
/**
* @openwire:property version=1
*/
- public String getBrokerName() {
+ public String getBrokerName(){
return brokerName;
}
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
+
+ public void setBrokerName(String brokerName){
+ this.brokerName=brokerName;
}
-
- public Response visit(CommandVisitor visitor) throws Exception {
- return visitor.processBrokerInfo( this );
+
+ public Response visit(CommandVisitor visitor) throws Exception{
+ return visitor.processBrokerInfo(this);
}
/**
- * @openwire:property version=1 cache=true
+ * @openwire:property version=1
*/
public boolean isSlaveBroker(){
return slaveBroker;
}
-
public void setSlaveBroker(boolean slaveBroker){
this.slaveBroker=slaveBroker;
}
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isMasterBroker(){
+ return masterBroker;
+ }
+
+ /**
+ * @param masterBroker
+ * The masterBroker to set.
+ */
+ public void setMasterBroker(boolean masterBroker){
+ this.masterBroker=masterBroker;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the faultTolerantConfiguration.
+ */
+ public boolean isFaultTolerantConfiguration(){
+ return faultTolerantConfiguration;
+ }
+
+ /**
+ * @param faultTolerantConfiguration
+ * The faultTolerantConfiguration to set.
+ */
+ public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration){
+ this.faultTolerantConfiguration=faultTolerantConfiguration;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Thu Apr 13 13:15:35 2006
@@ -47,6 +47,8 @@
byte CONTROL_COMMAND = 14;
byte FLUSH_COMMAND = 15;
byte CONNECTION_ERROR = 16;
+ byte CONSUMER_CONTROL = 17;
+ byte CONNECTION_CONTROL = 18;
///////////////////////////////////////////////////
//
@@ -124,6 +126,11 @@
byte BOOLEAN_TYPE = 78;
byte BYTE_ARRAY_TYPE = 79;
+
+
+
+
+
///////////////////////////////////////////////////
//
// Broker to Broker command objects
@@ -133,6 +140,7 @@
byte MESSAGE_DISPATCH_NOTIFICATION = 90;
byte NETWORK_BRIDGE_FILTER = 91;
+
///////////////////////////////////////////////////
//
// Data structures contained in the command objects.
@@ -153,6 +161,9 @@
byte PRODUCER_ID = 123;
byte BROKER_ID = 124;
+
+
+
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1,119 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+/**
+ * Used to start and stop transports as well as terminating clients.
+ *
+ * @openwire:marshaller code="18"
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ConnectionControl extends BaseCommand{
+ public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_CONTROL;
+ protected boolean suspend;
+ protected boolean resume;
+ protected boolean close;
+ protected boolean exit;
+ protected boolean faultTolerant;
+
+ public byte getDataStructureType(){
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception{
+ return null;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the close.
+ */
+ public boolean isClose(){
+ return close;
+ }
+
+ /**
+ * @param close
+ * The close to set.
+ */
+ public void setClose(boolean close){
+ this.close=close;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the exit.
+ */
+ public boolean isExit(){
+ return exit;
+ }
+
+ /**
+ * @param exit
+ * The exit to set.
+ */
+ public void setExit(boolean exit){
+ this.exit=exit;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the faultTolerant.
+ */
+ public boolean isFaultTolerant(){
+ return faultTolerant;
+ }
+
+ /**
+ * @param faultTolerant
+ * The faultTolerant to set.
+ */
+ public void setFaultTolerant(boolean faultTolerant){
+ this.faultTolerant=faultTolerant;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the resume.
+ */
+ public boolean isResume(){
+ return resume;
+ }
+
+ /**
+ * @param resume
+ * The resume to set.
+ */
+ public void setResume(boolean resume){
+ this.resume=resume;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the suspend.
+ */
+ public boolean isSuspend(){
+ return suspend;
+ }
+
+ /**
+ * @param suspend
+ * The suspend to set.
+ */
+ public void setSuspend(boolean suspend){
+ this.suspend=suspend;
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Thu Apr 13 13:15:35 2006
@@ -33,6 +33,8 @@
protected String userName;
protected String password;
protected BrokerId[] brokerPath;
+ protected boolean brokerMasterConnector;
+ protected boolean manageable;
public ConnectionInfo() {
}
@@ -43,6 +45,16 @@
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
+
+ public void copy(ConnectionInfo copy) {
+ super.copy(copy);
+ copy.clientId = clientId;
+ copy.userName = userName;
+ copy.password = password;
+ copy.brokerPath = brokerPath;
+ copy.brokerMasterConnector = brokerMasterConnector;
+ copy.manageable = manageable;
+ }
/**
* @openwire:property version=1 cache=true
@@ -104,6 +116,30 @@
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processAddConnection( this );
+ }
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isBrokerMasterConnector(){
+ return brokerMasterConnector;
+ }
+ /**
+ * @param brokerMasterConnector The brokerMasterConnector to set.
+ */
+ public void setBrokerMasterConnector(boolean slaveBroker){
+ this.brokerMasterConnector=slaveBroker;
+ }
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isManageable(){
+ return manageable;
+ }
+ /**
+ * @param manageable The manageable to set.
+ */
+ public void setManageable(boolean manageable){
+ this.manageable=manageable;
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1,115 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Used to start and stop transports as well as terminating clients.
+ *
+ * @openwire:marshaller code="17"
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ConsumerControl extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL;
+
+ protected ConsumerId consumerId;
+ protected boolean close;
+ protected int prefetch;
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+
+
+
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
+ }
+
+
+
+
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the close.
+ */
+ public boolean isClose(){
+ return close;
+ }
+
+
+
+
+
+ /**
+ * @param close The close to set.
+ */
+ public void setClose(boolean close){
+ this.close=close;
+ }
+
+
+
+
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the consumerId.
+ */
+ public ConsumerId getConsumerId(){
+ return consumerId;
+ }
+
+
+
+
+
+ /**
+ * @param consumerId The consumerId to set.
+ */
+ public void setConsumerId(ConsumerId consumerId){
+ this.consumerId=consumerId;
+ }
+
+
+
+
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the prefetch.
+ */
+ public int getPrefetch(){
+ return prefetch;
+ }
+
+
+
+
+
+ /**
+ * @param prefetch The prefetch to set.
+ */
+ public void setPrefetch(int prefetch){
+ this.prefetch=prefetch;
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Thu Apr 13 13:15:35 2006
@@ -48,6 +48,7 @@
protected boolean retroactive;
protected byte priority;
protected BrokerId[] brokerPath;
+ protected boolean optimizedAcknowledge;
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; //this subscription originated from a network connection
@@ -304,6 +305,21 @@
*/
public void setNetworkSubscription(boolean networkSubscription){
this.networkSubscription=networkSubscription;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the optimizedAcknowledge.
+ */
+ public boolean isOptimizedAcknowledge(){
+ return optimizedAcknowledge;
+ }
+
+ /**
+ * @param optimizedAcknowledge The optimizedAcknowledge to set.
+ */
+ public void setOptimizedAcknowledge(boolean optimizedAcknowledge){
+ this.optimizedAcknowledge=optimizedAcknowledge;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BrokerInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -81,6 +81,8 @@
}
info.setBrokerName(tightUnmarshalString(dataIn, bs));
info.setSlaveBroker(bs.readBoolean());
+ info.setMasterBroker(bs.readBoolean());
+ info.setFaultTolerantConfiguration(bs.readBoolean());
}
@@ -98,6 +100,8 @@
rc += tightMarshalObjectArray1(wireFormat, info.getPeerBrokerInfos(), bs);
rc += tightMarshalString1(info.getBrokerName(), bs);
bs.writeBoolean(info.isSlaveBroker());
+ bs.writeBoolean(info.isMasterBroker());
+ bs.writeBoolean(info.isFaultTolerantConfiguration());
return rc + 0;
}
@@ -118,6 +122,8 @@
tightMarshalObjectArray2(wireFormat, info.getPeerBrokerInfos(), dataOut, bs);
tightMarshalString2(info.getBrokerName(), dataOut, bs);
bs.readBoolean();
+ bs.readBoolean();
+ bs.readBoolean();
}
@@ -148,6 +154,8 @@
}
info.setBrokerName(looseUnmarshalString(dataIn));
info.setSlaveBroker(dataIn.readBoolean());
+ info.setMasterBroker(dataIn.readBoolean());
+ info.setFaultTolerantConfiguration(dataIn.readBoolean());
}
@@ -165,6 +173,8 @@
looseMarshalObjectArray(wireFormat, info.getPeerBrokerInfos(), dataOut);
looseMarshalString(info.getBrokerName(), dataOut);
dataOut.writeBoolean(info.isSlaveBroker());
+ dataOut.writeBoolean(info.isMasterBroker());
+ dataOut.writeBoolean(info.isFaultTolerantConfiguration());
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionControlMarshaller.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Marshalling code for Open Wire Format for ConnectionControlMarshaller
*
*
* NOTE!: This file is auto generated - do not modify!
* if yo
u need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ConnectionControlMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ConnectionControl.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ConnectionControl();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream
bs) throws IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
ConnectionControl info = (ConnectionControl)o;
info.setClose(bs.readBoolean());
info.setExit(bs.readBoolean());
info.setFaultTolerant(bs.readBoolean());
info.setResume(bs.readBoolean());
info.setSuspend(bs.readBoolean());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ConnectionControl info = (ConnectionControl)o;
int rc = super.tightMarshal1(wireFormat, o, bs);
bs.writeBoolean(info.isClose());
bs.writeBoolean(info.isExit());
bs.writeBoolean(info.isFaultTolerant());
bs.writeBoolean(info.isResume());
bs.writeBoolean(info.isSuspend());
return rc + 0;
}
/**
* Write a object instance to data output stream
*
* @pa
ram o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
*/
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
ConnectionControl info = (ConnectionControl)o;
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
ConnectionControl info = (ConnectionControl)o;
info.setClose(dat
aIn.readBoolean());
info.setExit(dataIn.readBoolean());
info.setFaultTolerant(dataIn.readBoolean());
info.setResume(dataIn.readBoolean());
info.setSuspend(dataIn.readBoolean());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {
ConnectionControl info = (ConnectionControl)o;
super.looseMarshal(wireFormat, o, dataOut);
dataOut.writeBoolean(info.isClose());
dataOut.writeBoolean(info.isExit());
dataOut.writeBoolean(info.isFaultTolerant());
dataOut.writeBoolean(info.isResume());
dataOut.writeBoolean(info.isSuspend());
}
}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConnectionInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -81,6 +81,8 @@
else {
info.setBrokerPath(null);
}
+ info.setBrokerMasterConnector(bs.readBoolean());
+ info.setManageable(bs.readBoolean());
}
@@ -98,6 +100,8 @@
rc += tightMarshalString1(info.getPassword(), bs);
rc += tightMarshalString1(info.getUserName(), bs);
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
+ bs.writeBoolean(info.isBrokerMasterConnector());
+ bs.writeBoolean(info.isManageable());
return rc + 0;
}
@@ -118,6 +122,8 @@
tightMarshalString2(info.getPassword(), dataOut, bs);
tightMarshalString2(info.getUserName(), dataOut, bs);
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
+ bs.readBoolean();
+ bs.readBoolean();
}
@@ -148,6 +154,8 @@
else {
info.setBrokerPath(null);
}
+ info.setBrokerMasterConnector(dataIn.readBoolean());
+ info.setManageable(dataIn.readBoolean());
}
@@ -165,6 +173,8 @@
looseMarshalString(info.getPassword(), dataOut);
looseMarshalString(info.getUserName(), dataOut);
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
+ dataOut.writeBoolean(info.isBrokerMasterConnector());
+ dataOut.writeBoolean(info.isManageable());
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerControlMarshaller.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Marshalling code for Open Wire Format for ConsumerControlMarshaller
*
*
* NOTE!: This file is auto generated - do not modify!
* if you
need to make a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision$
*/
public class ConsumerControlMarshaller extends BaseCommandMarshaller {
/**
* Return the type of Data Structure we marshal
* @return short representation of the type data structure
*/
public byte getDataStructureType() {
return ConsumerControl.DATA_STRUCTURE_TYPE;
}
/**
* @return a new object instance
*/
public DataStructure createObject() {
return new ConsumerControl();
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) thr
ows IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
ConsumerControl info = (ConsumerControl)o;
info.setClose(bs.readBoolean());
info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
info.setPrefetch(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
ConsumerControl info = (ConsumerControl)o;
int rc = super.tightMarshal1(wireFormat, o, bs);
bs.writeBoolean(info.isClose());
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs);
return rc + 4;
}
/**
* Write a object instance to data output stream
*
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thro
wn if an error occurs
*/
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
ConsumerControl info = (ConsumerControl)o;
bs.readBoolean();
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs);
dataOut.writeInt(info.getPrefetch());
}
/**
* Un-marshal an object instance from the data input stream
*
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
*/
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
ConsumerControl info = (ConsumerControl)o;
info.setClose(dataIn.readBoolean());
info.setConsumerId((org.apache.activemq.command
.ConsumerId) looseUnmarsalNestedObject(wireFormat, dataIn));
info.setPrefetch(dataIn.readInt());
}
/**
* Write the booleans that this object uses to a BooleanStream
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {
ConsumerControl info = (ConsumerControl)o;
super.looseMarshal(wireFormat, o, dataOut);
dataOut.writeBoolean(info.isClose());
looseMarshalNestedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut);
dataOut.writeInt(info.getPrefetch());
}
}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java Thu Apr 13 13:15:35 2006
@@ -91,6 +91,7 @@
}
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
info.setNetworkSubscription(bs.readBoolean());
+ info.setOptimizedAcknowledge(bs.readBoolean());
}
@@ -115,6 +116,7 @@
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getAdditionalPredicate(), bs);
bs.writeBoolean(info.isNetworkSubscription());
+ bs.writeBoolean(info.isOptimizedAcknowledge());
return rc + 9;
}
@@ -145,6 +147,7 @@
tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs);
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut, bs);
bs.readBoolean();
+ bs.readBoolean();
}
@@ -185,6 +188,7 @@
}
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) looseUnmarsalNestedObject(wireFormat, dataIn));
info.setNetworkSubscription(dataIn.readBoolean());
+ info.setOptimizedAcknowledge(dataIn.readBoolean());
}
@@ -212,6 +216,7 @@
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut);
dataOut.writeBoolean(info.isNetworkSubscription());
+ dataOut.writeBoolean(info.isOptimizedAcknowledge());
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java Thu Apr 13 13:15:35 2006
@@ -81,8 +81,10 @@
add(new DestinationInfoMarshaller());
add(new ShutdownInfoMarshaller());
add(new DataResponseMarshaller());
+ add(new ConnectionControlMarshaller());
add(new KeepAliveInfoMarshaller());
add(new FlushCommandMarshaller());
+ add(new ConsumerControlMarshaller());
add(new JournalTopicAckMarshaller());
add(new BrokerIdMarshaller());
add(new MessageDispatchMarshaller());
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Apr 13 13:15:35 2006
@@ -1,60 +1,53 @@
/**
- *
+ *
* Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.transport;
import java.io.IOException;
-
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
-
-
/**
* @version $Revision: 1.5 $
*/
-public class TransportFilter extends DefaultTransportListener implements Transport {
-
+public class TransportFilter implements TransportListener,Transport{
final protected Transport next;
private TransportListener transportListener;
- public TransportFilter(Transport next) {
- this.next = next;
+ public TransportFilter(Transport next){
+ this.next=next;
}
- public TransportListener getTransportListener() {
+ public TransportListener getTransportListener(){
return transportListener;
}
-
- public void setTransportListener(TransportListener channelListener) {
- this.transportListener = channelListener;
- if (channelListener == null)
+
+ public void setTransportListener(TransportListener channelListener){
+ this.transportListener=channelListener;
+ if(channelListener==null)
next.setTransportListener(null);
else
next.setTransportListener(this);
}
-
/**
* @see org.apache.activemq.Service#start()
- * @throws IOException if the next channel has not been set.
+ * @throws IOException
+ * if the next channel has not been set.
*/
- public void start() throws Exception {
- if( next == null )
+ public void start() throws Exception{
+ if(next==null)
throw new IOException("The next channel has not been set.");
- if( transportListener == null )
+ if(transportListener==null)
throw new IOException("The command listener has not been set.");
next.start();
}
@@ -62,51 +55,57 @@
/**
* @see org.apache.activemq.Service#stop()
*/
- public void stop() throws Exception {
+ public void stop() throws Exception{
next.stop();
- }
+ }
- public void onCommand(Command command) {
+ public void onCommand(Command command){
transportListener.onCommand(command);
}
/**
* @return Returns the next.
*/
- public Transport getNext() {
+ public Transport getNext(){
return next;
}
-
- public String toString() {
+ public String toString(){
return next.toString();
}
- public void oneway(Command command) throws IOException {
+ public void oneway(Command command) throws IOException{
next.oneway(command);
}
- public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
- return next.asyncRequest(command, null);
+ public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
+ return next.asyncRequest(command,null);
}
- public Response request(Command command) throws IOException {
+ public Response request(Command command) throws IOException{
return next.request(command);
}
-
- public Response request(Command command,int timeout) throws IOException {
+
+ public Response request(Command command,int timeout) throws IOException{
return next.request(command,timeout);
}
- public void onException(IOException error) {
+ public void onException(IOException error){
transportListener.onException(error);
}
- public Object narrow(Class target) {
- if( target.isAssignableFrom(getClass()) ) {
+ public void transportInterupted(){
+ transportListener.transportInterupted();
+ }
+
+ public void transportResumed(){
+ transportListener.transportResumed();
+ }
+
+ public Object narrow(Class target){
+ if(target.isAssignableFrom(getClass())){
return this;
}
return next.narrow(target);
- }
-
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Apr 13 13:15:35 2006
@@ -78,7 +78,7 @@
private long reconnectDelay = initialReconnectDelay;
private Exception connectionFailure;
- private final TransportListener myTransportListener = new DefaultTransportListener() {
+ private final TransportListener myTransportListener = new TransportListener() {
public void onCommand(Command command) {
if (command == null) {
return;
@@ -113,6 +113,18 @@
transportListener.onException(new InterruptedIOException());
}
}
+
+ public void transportInterupted(){
+ if (transportListener != null){
+ transportListener.transportInterupted();
+ }
+ }
+
+ public void transportResumed(){
+ if(transportListener != null){
+ transportListener.transportResumed();
+ }
+ }
};
public FailoverTransport() throws InterruptedIOException {
@@ -147,9 +159,11 @@
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
+
if (started) {
restoreTransport(t);
}
+
log.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
@@ -159,6 +173,7 @@
if (transportListener != null){
transportListener.transportResumed();
}
+
return false;
}
catch (Exception e) {
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java Thu Apr 13 13:15:35 2006
@@ -62,6 +62,8 @@
}
info.setBrokerName("BrokerName:4");
info.setSlaveBroker(true);
+ info.setMasterBroker(false);
+ info.setFaultTolerantConfiguration(true);
}
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionControlTest.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for ConnectionControl
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to m
ake a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class ConnectionControlTest extends BaseCommandTestSupport {
public static ConnectionControlTest SINGLETON = new ConnectionControlTest();
public Object createObject() throws Exception {
ConnectionControl info = new ConnectionControl();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ConnectionControl info = (ConnectionControl) object;
info.setClose(true);
info.setExit(false);
info.setFaultTolerant(true);
info.setResume(false);
info.setSuspend(true);
}
}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java Thu Apr 13 13:15:35 2006
@@ -62,6 +62,8 @@
}
info.setBrokerPath(value);
}
+ info.setBrokerMasterConnector(true);
+ info.setManageable(false);
}
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java?rev=393912&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerControlTest.java Thu Apr 13 13:15:35 2006
@@ -0,0 +1 @@
+/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.v1;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.openwire.*;
import org.apache.activemq.command.*;
/**
* Test case for the OpenWire marshalling for ConsumerControl
*
*
* NOTE!: This file is auto generated - do not modify!
* if you need to mak
e a change, please see the modify the groovy scripts in the
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
* @version $Revision: $
*/
public class ConsumerControlTest extends BaseCommandTestSupport {
public static ConsumerControlTest SINGLETON = new ConsumerControlTest();
public Object createObject() throws Exception {
ConsumerControl info = new ConsumerControl();
populateObject(info);
return info;
}
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ConsumerControl info = (ConsumerControl) object;
info.setClose(true);
info.setConsumerId(createConsumerId("ConsumerId:1"));
info.setPrefetch(1);
}
}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java?rev=393912&r1=393911&r2=393912&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java Thu Apr 13 13:15:35 2006
@@ -72,6 +72,7 @@
}
info.setAdditionalPredicate(createBooleanExpression("AdditionalPredicate:6"));
info.setNetworkSubscription(false);
+ info.setOptimizedAcknowledge(true);
}
}