You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC

svn commit: r781177 [11/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transp...

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transaction;
+
+/**
+ * @version $Revision$
+ */
+public class Synchronization {
+
+    public void beforeEnd() throws Exception {
+    }
+
+    public void afterCommit() throws Exception {
+    }
+
+    public void afterRollback() throws Exception {
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ * 
+ * @version $Revision: 1.5 $
+ */
+public abstract class Transaction {
+
+    public static final byte START_STATE = 0; // can go to: 1,2,3
+    public static final byte IN_USE_STATE = 1; // can go to: 2,3
+    public static final byte PREPARED_STATE = 2; // can go to: 3
+    public static final byte FINISHED_STATE = 3;
+
+    private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
+    private byte state = START_STATE;
+
+    public byte getState() {
+        return state;
+    }
+
+    public void setState(byte state) {
+        this.state = state;
+    }
+
+    public void addSynchronization(Synchronization r) {
+        synchronizations.add(r);
+        if (state == START_STATE) {
+            state = IN_USE_STATE;
+        }
+    }
+
+    public void removeSynchronization(Synchronization r) {
+        synchronizations.remove(r);
+    }
+
+    public void prePrepare() throws Exception {
+
+        // Is it ok to call prepare now given the state of the
+        // transaction?
+        switch (state) {
+        case START_STATE:
+        case IN_USE_STATE:
+            break;
+        default:
+            XAException xae = new XAException("Prepare cannot be called now.");
+            xae.errorCode = XAException.XAER_PROTO;
+            throw xae;
+        }
+
+        // // Run the prePrepareTasks
+        // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
+        // Callback r = (Callback) iter.next();
+        // r.execute();
+        // }
+    }
+
+    protected void fireAfterCommit() throws Exception {
+        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+            Synchronization s = iter.next();
+            s.afterCommit();
+        }
+    }
+
+    public void fireAfterRollback() throws Exception {
+    	Collections.reverse(synchronizations);
+        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+            Synchronization s = iter.next();
+            s.afterRollback();
+        }
+    }
+
+    public String toString() {
+        return super.toString() + "[synchronizations=" + synchronizations + "]";
+    }
+
+    public abstract void commit(boolean onePhase) throws XAException, IOException;
+
+    public abstract void rollback() throws XAException, IOException;
+
+    public abstract int prepare() throws XAException, IOException;
+
+    public abstract TransactionId getTransactionId();
+
+    public boolean isPrepared() {
+        return getState() == PREPARED_STATE;
+    }
+    
+    public int size() {
+        return synchronizations.size();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java Tue Jun  2 21:29:30 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq;
 
+import javax.jms.JMSException;
+
 import org.apache.activemq.command.ActiveMQTempDestination;
 
 
@@ -27,5 +29,5 @@
 
     boolean isObjectMessageSerializationDefered();
 
-    void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination);
+    void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination) throws JMSException;
 }

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ * 
+ * @version $Revision$
+ */
+public class InactivityMonitor extends TransportFilter {
+
+    private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
+    private static final ThreadPoolExecutor ASYNC_TASKS;
+    
+    private static int CHECKER_COUNTER;
+    private static Timer  READ_CHECK_TIMER;
+    private static Timer  WRITE_CHECK_TIMER;
+    
+    private WireFormatInfo localWireFormatInfo;
+    private WireFormatInfo remoteWireFormatInfo;
+    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandSent = new AtomicBoolean(false);
+    private final AtomicBoolean inSend = new AtomicBoolean(false);
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
+    private final AtomicBoolean inReceive = new AtomicBoolean(false);
+    private SchedulerTimerTask writeCheckerTask;
+    private SchedulerTimerTask readCheckerTask;
+    
+    private long readCheckTime;
+    private long writeCheckTime;
+    private long initialDelayTime;
+    
+    private WireFormat wireFormat;
+    
+    private final Runnable readChecker = new Runnable() {
+        long lastRunTime;
+        public void run() {
+            long now = System.currentTimeMillis();
+            long elapsed = (now-lastRunTime);
+
+            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+                LOG.debug(""+elapsed+" ms elapsed since last read check.");
+            }
+            
+            // Perhaps the timer executed a read check late.. and then executes
+            // the next read check on time which causes the time elapsed between
+            // read checks to be small..
+            
+            // If less than 90% of the read check Time elapsed then abort this readcheck. 
+            if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
+                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
+                return;
+            }
+            
+            lastRunTime = now;
+            readCheck();
+        }
+    };
+    
+    private boolean allowReadCheck(long elapsed) {
+        return elapsed > (readCheckTime * 9 / 10);
+    }
+
+    private final Runnable writeChecker = new Runnable() {
+        long lastRunTime;
+        public void run() {
+            long now = System.currentTimeMillis();
+            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+                LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
+                
+            }
+            lastRunTime = now; 
+            writeCheck();
+        }
+    };
+
+    public InactivityMonitor(Transport next, WireFormat wireFormat) {
+        super(next);
+        this.wireFormat = wireFormat;
+    }
+
+    public void stop() throws Exception {
+    	stopMonitorThreads();
+        next.stop();
+    }
+
+    final void writeCheck() {
+        if (inSend.get()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A send is in progress");
+            }
+            return;
+        }
+
+        if (!commandSent.get()) {
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+            }
+            ASYNC_TASKS.execute(new Runnable() {
+                public void run() {
+                    if (monitorStarted.get()) {
+                        try {
+
+                            KeepAliveInfo info = new KeepAliveInfo();
+                            info.setResponseRequired(true);
+                            oneway(info);
+                        } catch (IOException e) {
+                            onException(e);
+                        }
+                    }
+                };
+            });
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message sent since last write check, resetting flag");
+            }
+        }
+
+        commandSent.set(false);
+    }
+
+    final void readCheck() {
+        if (inReceive.get() || wireFormat.inReceive()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A receive is in progress");
+            }
+            return;
+        }
+        if (!commandReceived.get()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            }
+            ASYNC_TASKS.execute(new Runnable() {  
+                public void run() {
+                    onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
+                };
+                
+            });
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message received since last read check, resetting flag: ");
+            }
+        }
+        commandReceived.set(false);
+    }
+
+    public void onCommand(Object command) {
+        commandReceived.set(true);
+        inReceive.set(true);
+        try {
+            if (command.getClass() == KeepAliveInfo.class) {
+                KeepAliveInfo info = (KeepAliveInfo) command;
+                if (info.isResponseRequired()) {
+                    try {
+                        info.setResponseRequired(false);
+                        oneway(info);
+                    } catch (IOException e) {
+                        onException(e);
+                    }
+                }
+            } else {
+                if (command.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        IOException error = null;
+                        remoteWireFormatInfo = (WireFormatInfo) command;
+                        try {
+                            startMonitorThreads();
+                        } catch (IOException e) {
+                            error = e;
+                        }
+                        if (error != null) {
+                            onException(error);
+                        }
+                    }
+                }
+                synchronized (readChecker) {
+                    transportListener.onCommand(command);
+                }
+            }
+        } finally {
+            
+            inReceive.set(false);
+        }
+    }
+
+    public void oneway(Object o) throws IOException {
+        // Disable inactivity monitoring while processing a command.
+        //synchronize this method - its not synchronized
+        //further down the transport stack and gets called by more 
+        //than one thread  by this class
+        synchronized(inSend) {
+            inSend.set(true);
+            try {
+                
+                if( failed.get() ) {
+                    throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
+                }
+                if (o.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        localWireFormatInfo = (WireFormatInfo)o;
+                        startMonitorThreads();
+                    }
+                }
+                next.oneway(o);
+            } finally {
+                commandSent.set(true);
+                inSend.set(false);
+            }
+        }
+    }
+
+    public void onException(IOException error) {
+        if (failed.compareAndSet(false, true)) {
+            stopMonitorThreads();
+            transportListener.onException(error);
+        }
+    }    	
+
+    private synchronized void startMonitorThreads() throws IOException {
+        if (monitorStarted.get()) {
+            return;
+        }
+        if (localWireFormatInfo == null) {
+            return;
+        }
+        if (remoteWireFormatInfo == null) {
+            return;
+        }
+
+        readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+        initialDelayTime =  Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+        if (readCheckTime > 0) {
+            monitorStarted.set(true);
+            writeCheckerTask = new SchedulerTimerTask(writeChecker);
+            readCheckerTask = new  SchedulerTimerTask(readChecker);
+            writeCheckTime = readCheckTime/3;
+            synchronized( InactivityMonitor.class ) {
+            	if( CHECKER_COUNTER == 0 ) {
+            	    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
+            	    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
+            	}
+            	CHECKER_COUNTER++;
+                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
+                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private synchronized void stopMonitorThreads() {
+        if (monitorStarted.compareAndSet(true, false)) {
+            readCheckerTask.cancel();
+            writeCheckerTask.cancel();
+            synchronized( InactivityMonitor.class ) {
+	            WRITE_CHECK_TIMER.purge();
+	            READ_CHECK_TIMER.purge();
+	            CHECKER_COUNTER--;
+	            if(CHECKER_COUNTER==0) {
+	            	WRITE_CHECK_TIMER.cancel();
+	            	READ_CHECK_TIMER.cancel();
+	            	WRITE_CHECK_TIMER = null;
+	            	READ_CHECK_TIMER = null;
+	            }
+            }
+        }
+    }
+    
+       
+    static {
+        ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+* A holder for different thread priorites used in ActiveMQ
+* 
+* @version $Revision: 1.9 $
+*/
+
+public interface ThreadPriorities {
+    int INBOUND_BROKER_CONNECTION = 6;
+    int OUT_BOUND_BROKER_DISPATCH = 6;
+    int INBOUND_CLIENT_CONNECTION = 7;
+    int INBOUND_CLIENT_SESSION = 7;
+    int BROKER_MANAGEMENT = 9;
+}

Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * This is exception is thrown when the transport layer detects that the underlying socket has been inactive for 
+ * too long.
+ * 
+ * @version $Revision$
+ */
+public class InactivityIOException extends IOException {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 5816001466763503220L;
+
+    public InactivityIOException() {
+        super();
+    }
+
+    /**
+     * @param message
+     */
+    public InactivityIOException(String message) {
+        super(message);
+    }
+
+
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+
+import org.apache.activemq.util.ServiceSupport;
+
+/**
+ * A useful base class for implementations of {@link TransportServer}
+ * 
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
+
+    private URI connectURI;
+    private URI bindLocation;
+    private TransportAcceptListener acceptListener;
+
+    public TransportServerSupport() {
+    }
+
+    public TransportServerSupport(URI location) {
+        this.connectURI = location;
+        this.bindLocation = location;
+    }
+
+    /**
+     * @return Returns the acceptListener.
+     */
+    public TransportAcceptListener getAcceptListener() {
+        return acceptListener;
+    }
+
+    /**
+     * Registers an accept listener
+     * 
+     * @param acceptListener
+     */
+    public void setAcceptListener(TransportAcceptListener acceptListener) {
+        this.acceptListener = acceptListener;
+    }
+
+    /**
+     * @return Returns the location.
+     */
+    public URI getConnectURI() {
+        return connectURI;
+    }
+
+    /**
+     * @param location The location to set.
+     */
+    public void setConnectURI(URI location) {
+        this.connectURI = location;
+    }
+
+    protected void onAcceptError(Exception e) {
+        if (acceptListener != null) {
+            acceptListener.onAcceptError(e);
+        }
+    }
+
+    public URI getBindLocation() {
+        return bindLocation;
+    }
+
+    public void setBindLocation(URI bindLocation) {
+        this.bindLocation = bindLocation;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+
+import org.apache.activemq.ThreadPriorities;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A useful base class for implementations of {@link TransportServer} which uses
+ * a background thread to accept new connections.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable {
+    private static final Log LOG = LogFactory.getLog(TransportServerThreadSupport.class);
+
+    private boolean daemon = true;
+    private boolean joinOnStop = true;
+    private Thread runner;
+    // should be a multiple of 128k
+    private long stackSize;
+
+    public TransportServerThreadSupport() {
+    }
+
+    public TransportServerThreadSupport(URI location) {
+        super(location);
+    }
+
+    public boolean isDaemon() {
+        return daemon;
+    }
+
+    /**
+     * Sets whether the background read thread is a daemon thread or not
+     */
+    public void setDaemon(boolean daemon) {
+        this.daemon = daemon;
+    }
+
+    public boolean isJoinOnStop() {
+        return joinOnStop;
+    }
+
+    /**
+     * Sets whether the background read thread is joined with (waited for) on a
+     * stop
+     */
+    public void setJoinOnStop(boolean joinOnStop) {
+        this.joinOnStop = joinOnStop;
+    }
+
+    protected void doStart() throws Exception {
+        LOG.info("Listening for connections at: " + getConnectURI());
+        runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
+        runner.setDaemon(daemon);
+        runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
+        runner.start();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        if (runner != null && joinOnStop) {
+            runner.join();
+            runner = null;
+        }
+    }
+
+    /**
+     * @return the stackSize
+     */
+    public long getStackSize() {
+        return this.stackSize;
+    }
+
+    /**
+     * @param stackSize the stackSize to set
+     */
+    public void setStackSize(long stackSize) {
+        this.stackSize = stackSize;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A useful base class for transport implementations.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportSupport extends ServiceSupport implements Transport {
+    private static final Log LOG = LogFactory.getLog(TransportSupport.class);
+
+    TransportListener transportListener;
+
+    /**
+     * Returns the current transport listener
+     */
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
+
+    /**
+     * Registers an inbound command listener
+     * 
+     * @param commandListener
+     */
+    public void setTransportListener(TransportListener commandListener) {
+        this.transportListener = commandListener;
+    }
+
+    /**
+     * narrow acceptance
+     * 
+     * @param target
+     * @return 'this' if assignable
+     */
+    public <T> T narrow(Class<T> target) {
+        boolean assignableFrom = target.isAssignableFrom(getClass());
+        if (assignableFrom) {
+            return target.cast(this);
+        }
+        return null;
+    }
+
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    /**
+     * Process the inbound command
+     */
+    public void doConsume(Object command) {
+        if (command != null) {
+            if (transportListener != null) {
+                transportListener.onCommand(command);
+            } else {
+                LOG.error("No transportListener available to process inbound command: " + command);
+            }
+        }
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (transportListener != null) {
+            transportListener.onException(e);
+        }
+    }
+
+    protected void checkStarted() throws IOException {
+        if (!isStarted()) {
+            throw new IOException("The transport is not running.");
+        }
+    }
+
+    public boolean isFaultTolerant() {
+        return false;
+    }
+    
+   
+	public void reconnect(URI uri) throws IOException {
+		throw new IOException("Not supported");
+	}
+	
+	public boolean isDisposed() {
+		return isStopped();
+	}
+	
+	public  boolean isConnected() {
+	    return isStarted();
+	}
+
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * A useful base class for a transport implementation which has a background
+ * reading thread.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportThreadSupport extends TransportSupport implements Runnable {
+
+    private boolean daemon;
+    private Thread runner;
+    // should be a multiple of 128k
+    private long stackSize;
+
+    public boolean isDaemon() {
+        return daemon;
+    }
+
+    public void setDaemon(boolean daemon) {
+        this.daemon = daemon;
+    }
+
+    protected void doStart() throws Exception {
+        runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize);
+        runner.setDaemon(daemon);
+        runner.start();
+    }
+
+    /**
+     * @return the stackSize
+     */
+    public long getStackSize() {
+        return this.stackSize;
+    }
+
+    /**
+     * @param stackSize the stackSize to set
+     */
+    public void setStackSize(long stackSize) {
+        this.stackSize = stackSize;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Simple BitArray to enable setting multiple boolean values efficently Used
+ * instead of BitSet because BitSet does not allow for efficent serialization.
+ * Will store up to 64 boolean values
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArray {
+    static final int LONG_SIZE = 64;
+    static final int INT_SIZE = 32;
+    static final int SHORT_SIZE = 16;
+    static final int BYTE_SIZE = 8;
+    private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
+                                              0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L,
+                                              0x0000000000000040L, 0x0000000000000080L, 0x0000000000000100L,
+                                              0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L,
+                                              0x0000000000001000L, 0x0000000000002000L, 0x0000000000004000L,
+                                              0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L,
+                                              0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L,
+                                              0x0000000000200000L, 0x0000000000400000L, 0x0000000000800000L,
+                                              0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L,
+                                              0x0000000008000000L, 0x0000000010000000L, 0x0000000020000000L,
+                                              0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L,
+                                              0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L,
+                                              0x0000001000000000L, 0x0000002000000000L, 0x0000004000000000L,
+                                              0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L,
+                                              0x0000040000000000L, 0x0000080000000000L, 0x0000100000000000L,
+                                              0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L,
+                                              0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L,
+                                              0x0008000000000000L, 0x0010000000000000L, 0x0020000000000000L,
+                                              0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L,
+                                              0x0200000000000000L, 0x0400000000000000L, 0x0800000000000000L,
+                                              0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L,
+                                              0x8000000000000000L};
+    private long bits;
+    private int length;
+
+    /**
+     * @return the length of bits set
+     */
+    public int length() {
+        return length;
+    }
+
+    /**
+     * @return the long containing the bits
+     */
+    public long getBits() {
+        return bits;
+    }
+
+    /**
+     * set the boolean value at the index
+     * 
+     * @param index
+     * @param flag
+     * @return the old value held at this index
+     */
+    public boolean set(int index, boolean flag) {
+        length = Math.max(length, index + 1);
+        boolean oldValue = (bits & BIT_VALUES[index]) != 0;
+        if (flag) {
+            bits |= BIT_VALUES[index];
+        } else if (oldValue) {
+            bits &= ~(BIT_VALUES[index]);
+        }
+        return oldValue;
+    }
+
+    /**
+     * @param index
+     * @return the boolean value at this index
+     */
+    public boolean get(int index) {
+        return (bits & BIT_VALUES[index]) != 0;
+    }
+
+    /**
+     * reset all the bit values to false
+     */
+    public void reset() {
+        bits = 0;
+    }
+
+    /**
+     * reset all the bits to the value supplied
+     * 
+     * @param bits
+     */
+    public void reset(long bits) {
+        this.bits = bits;
+    }
+
+    /**
+     * write the bits to an output stream
+     * 
+     * @param dataOut
+     * @throws IOException
+     */
+    public void writeToStream(DataOutput dataOut) throws IOException {
+        dataOut.writeByte(length);
+        if (length <= BYTE_SIZE) {
+            dataOut.writeByte((int)bits);
+        } else if (length <= SHORT_SIZE) {
+            dataOut.writeShort((short)bits);
+        } else if (length <= INT_SIZE) {
+            dataOut.writeInt((int)bits);
+        } else {
+            dataOut.writeLong(bits);
+        }
+    }
+
+    /**
+     * read the bits from an input stream
+     * 
+     * @param dataIn
+     * @throws IOException
+     */
+    public void readFromStream(DataInput dataIn) throws IOException {
+        length = dataIn.readByte();
+        if (length <= BYTE_SIZE) {
+            bits = dataIn.readByte();
+        } else if (length <= SHORT_SIZE) {
+            bits = dataIn.readShort();
+        } else if (length <= INT_SIZE) {
+            bits = dataIn.readInt();
+        } else {
+            bits = dataIn.readLong();
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.LinkedList;
+
+/**
+ * Holder for many bitArrays - used for message audit
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArrayBin {
+
+    private LinkedList<BitArray> list;
+    private int maxNumberOfArrays;
+    private int firstIndex = -1;
+    private long lastInOrderBit=-1;
+
+    /**
+     * Create a BitArrayBin to a certain window size (number of messages to
+     * keep)
+     * 
+     * @param windowSize
+     */
+    public BitArrayBin(int windowSize) {
+        maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
+        maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
+        list = new LinkedList<BitArray>();
+        for (int i = 0; i < maxNumberOfArrays; i++) {
+            list.add(null);
+        }
+    }
+
+    /**
+     * Set a bit
+     * 
+     * @param index
+     * @param value
+     * @return true if set
+     */
+    public boolean setBit(long index, boolean value) {
+        boolean answer = false;
+        BitArray ba = getBitArray(index);
+        if (ba != null) {
+            int offset = getOffset(index);
+            if (offset >= 0) {
+                answer = ba.set(offset, value);
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * Test if in order
+     * @param index
+     * @return true if next message is in order
+     */
+    public boolean isInOrder(long index) {
+        boolean result = false;
+        if (lastInOrderBit == -1) {
+            result = true;
+        } else {
+            result = lastInOrderBit + 1 == index;
+        }
+        lastInOrderBit = index;
+        return result;
+
+    }
+
+    /**
+     * Get the boolean value at the index
+     * 
+     * @param index
+     * @return true/false
+     */
+    public boolean getBit(long index) {
+        boolean answer = index >= firstIndex;
+        BitArray ba = getBitArray(index);
+        if (ba != null) {
+            int offset = getOffset(index);
+            if (offset >= 0) {
+                answer = ba.get(offset);
+                return answer;
+            }
+        } else {
+            // gone passed range for previous bins so assume set
+            answer = true;
+        }
+        return answer;
+    }
+
+    /**
+     * Get the BitArray for the index
+     * 
+     * @param index
+     * @return BitArray
+     */
+    private BitArray getBitArray(long index) {
+        int bin = getBin(index);
+        BitArray answer = null;
+        if (bin >= 0) {
+            if (bin >= maxNumberOfArrays) {
+                int overShoot = bin - maxNumberOfArrays + 1;
+                while (overShoot > 0) {
+                    list.removeFirst();
+                    firstIndex += BitArray.LONG_SIZE;
+                    list.add(new BitArray());
+                    overShoot--;
+                }
+                
+                bin = maxNumberOfArrays - 1;
+            }
+            answer = list.get(bin);
+            if (answer == null) {
+                answer = new BitArray();
+                list.set(bin, answer);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Get the index of the bin from the total index
+     * 
+     * @param index
+     * @return the index of the bin
+     */
+    private int getBin(long index) {
+        int answer = 0;
+        if (firstIndex < 0) {
+            firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
+        } else if (firstIndex >= 0) {
+            answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
+        }
+        return answer;
+    }
+
+    /**
+     * Get the offset into a bin from the total index
+     * 
+     * @param index
+     * @return the relative offset into a bin
+     */
+    private int getOffset(long index) {
+        int answer = 0;
+        if (firstIndex >= 0) {
+            answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
+        }
+        return answer;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.PrintWriter;
+
+/**
+ * A helper class for printing indented text
+ *
+ * @version $Revision: 1.2 $
+ */
+public class IndentPrinter {
+
+    private int indentLevel;
+    private String indent;
+    private PrintWriter out;
+
+    public IndentPrinter() {
+        this(new PrintWriter(System.out), "  ");
+    }
+
+    public IndentPrinter(PrintWriter out) {
+        this(out, "  ");
+    }
+
+    public IndentPrinter(PrintWriter out, String indent) {
+        this.out = out;
+        this.indent = indent;
+    }
+
+    public void println(Object value) {
+        out.print(value.toString());
+        out.println();
+    }
+
+    public void println(String text) {
+        out.print(text);
+        out.println();
+    }
+
+    public void print(String text) {
+        out.print(text);
+    }
+
+    public void printIndent() {
+        for (int i = 0; i < indentLevel; i++) {
+            out.print(indent);
+        }
+    }
+
+    public void println() {
+        out.println();
+    }
+
+    public void incrementIndent() {
+        ++indentLevel;
+    }
+
+    public void decrementIndent() {
+        --indentLevel;
+    }
+
+    public int getIndentLevel() {
+        return indentLevel;
+    }
+
+    public void setIndentLevel(int indentLevel) {
+        this.indentLevel = indentLevel;
+    }
+
+    public void flush() {
+        out.flush();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java Tue Jun  2 21:29:30 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.util;
 
+import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
@@ -86,4 +87,11 @@
         exception.initCause(cause);
         return exception;
     }
+    
+    public static InvalidSelectorException createInvalidSelectorException(Exception e) {
+        InvalidSelectorException se = new InvalidSelectorException(e.getMessage());
+        se.initCause(e);
+        return se;
+    }
+    
 }