You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/02 23:29:35 UTC
svn commit: r781177 [11/11] - in /activemq/sandbox/activemq-flow:
activemq-bio/ activemq-bio/src/main/java/org/
activemq-bio/src/main/java/org/apache/
activemq-bio/src/main/java/org/apache/activemq/
activemq-bio/src/main/java/org/apache/activemq/transp...
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transaction;
+
+/**
+ * @version $Revision$
+ */
+public class Synchronization {
+
+ public void beforeEnd() throws Exception {
+ }
+
+ public void afterCommit() throws Exception {
+ }
+
+ public void afterRollback() throws Exception {
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ *
+ * @version $Revision: 1.5 $
+ */
+public abstract class Transaction {
+
+ public static final byte START_STATE = 0; // can go to: 1,2,3
+ public static final byte IN_USE_STATE = 1; // can go to: 2,3
+ public static final byte PREPARED_STATE = 2; // can go to: 3
+ public static final byte FINISHED_STATE = 3;
+
+ private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
+ private byte state = START_STATE;
+
+ public byte getState() {
+ return state;
+ }
+
+ public void setState(byte state) {
+ this.state = state;
+ }
+
+ public void addSynchronization(Synchronization r) {
+ synchronizations.add(r);
+ if (state == START_STATE) {
+ state = IN_USE_STATE;
+ }
+ }
+
+ public void removeSynchronization(Synchronization r) {
+ synchronizations.remove(r);
+ }
+
+ public void prePrepare() throws Exception {
+
+ // Is it ok to call prepare now given the state of the
+ // transaction?
+ switch (state) {
+ case START_STATE:
+ case IN_USE_STATE:
+ break;
+ default:
+ XAException xae = new XAException("Prepare cannot be called now.");
+ xae.errorCode = XAException.XAER_PROTO;
+ throw xae;
+ }
+
+ // // Run the prePrepareTasks
+ // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
+ // Callback r = (Callback) iter.next();
+ // r.execute();
+ // }
+ }
+
+ protected void fireAfterCommit() throws Exception {
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.afterCommit();
+ }
+ }
+
+ public void fireAfterRollback() throws Exception {
+ Collections.reverse(synchronizations);
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.afterRollback();
+ }
+ }
+
+ public String toString() {
+ return super.toString() + "[synchronizations=" + synchronizations + "]";
+ }
+
+ public abstract void commit(boolean onePhase) throws XAException, IOException;
+
+ public abstract void rollback() throws XAException, IOException;
+
+ public abstract int prepare() throws XAException, IOException;
+
+ public abstract TransactionId getTransactionId();
+
+ public boolean isPrepared() {
+ return getState() == PREPARED_STATE;
+ }
+
+ public int size() {
+ return synchronizations.size();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java Tue Jun 2 21:29:30 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq;
+import javax.jms.JMSException;
+
import org.apache.activemq.command.ActiveMQTempDestination;
@@ -27,5 +29,5 @@
boolean isObjectMessageSerializationDefered();
- void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination);
+ void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination) throws JMSException;
}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ *
+ * @version $Revision$
+ */
+public class InactivityMonitor extends TransportFilter {
+
+ private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
+ private static final ThreadPoolExecutor ASYNC_TASKS;
+
+ private static int CHECKER_COUNTER;
+ private static Timer READ_CHECK_TIMER;
+ private static Timer WRITE_CHECK_TIMER;
+
+ private WireFormatInfo localWireFormatInfo;
+ private WireFormatInfo remoteWireFormatInfo;
+ private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
+
+ private final AtomicBoolean commandSent = new AtomicBoolean(false);
+ private final AtomicBoolean inSend = new AtomicBoolean(false);
+ private final AtomicBoolean failed = new AtomicBoolean(false);
+
+ private final AtomicBoolean commandReceived = new AtomicBoolean(true);
+ private final AtomicBoolean inReceive = new AtomicBoolean(false);
+ private SchedulerTimerTask writeCheckerTask;
+ private SchedulerTimerTask readCheckerTask;
+
+ private long readCheckTime;
+ private long writeCheckTime;
+ private long initialDelayTime;
+
+ private WireFormat wireFormat;
+
+ private final Runnable readChecker = new Runnable() {
+ long lastRunTime;
+ public void run() {
+ long now = System.currentTimeMillis();
+ long elapsed = (now-lastRunTime);
+
+ if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+ LOG.debug(""+elapsed+" ms elapsed since last read check.");
+ }
+
+ // Perhaps the timer executed a read check late.. and then executes
+ // the next read check on time which causes the time elapsed between
+ // read checks to be small..
+
+ // If less than 90% of the read check Time elapsed then abort this readcheck.
+ if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
+ LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
+ return;
+ }
+
+ lastRunTime = now;
+ readCheck();
+ }
+ };
+
+ private boolean allowReadCheck(long elapsed) {
+ return elapsed > (readCheckTime * 9 / 10);
+ }
+
+ private final Runnable writeChecker = new Runnable() {
+ long lastRunTime;
+ public void run() {
+ long now = System.currentTimeMillis();
+ if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+ LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
+
+ }
+ lastRunTime = now;
+ writeCheck();
+ }
+ };
+
+ public InactivityMonitor(Transport next, WireFormat wireFormat) {
+ super(next);
+ this.wireFormat = wireFormat;
+ }
+
+ public void stop() throws Exception {
+ stopMonitorThreads();
+ next.stop();
+ }
+
+ final void writeCheck() {
+ if (inSend.get()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("A send is in progress");
+ }
+ return;
+ }
+
+ if (!commandSent.get()) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+ }
+ ASYNC_TASKS.execute(new Runnable() {
+ public void run() {
+ if (monitorStarted.get()) {
+ try {
+
+ KeepAliveInfo info = new KeepAliveInfo();
+ info.setResponseRequired(true);
+ oneway(info);
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+ };
+ });
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message sent since last write check, resetting flag");
+ }
+ }
+
+ commandSent.set(false);
+ }
+
+ final void readCheck() {
+ if (inReceive.get() || wireFormat.inReceive()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("A receive is in progress");
+ }
+ return;
+ }
+ if (!commandReceived.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ }
+ ASYNC_TASKS.execute(new Runnable() {
+ public void run() {
+ onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
+ };
+
+ });
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message received since last read check, resetting flag: ");
+ }
+ }
+ commandReceived.set(false);
+ }
+
+ public void onCommand(Object command) {
+ commandReceived.set(true);
+ inReceive.set(true);
+ try {
+ if (command.getClass() == KeepAliveInfo.class) {
+ KeepAliveInfo info = (KeepAliveInfo) command;
+ if (info.isResponseRequired()) {
+ try {
+ info.setResponseRequired(false);
+ oneway(info);
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+ } else {
+ if (command.getClass() == WireFormatInfo.class) {
+ synchronized (this) {
+ IOException error = null;
+ remoteWireFormatInfo = (WireFormatInfo) command;
+ try {
+ startMonitorThreads();
+ } catch (IOException e) {
+ error = e;
+ }
+ if (error != null) {
+ onException(error);
+ }
+ }
+ }
+ synchronized (readChecker) {
+ transportListener.onCommand(command);
+ }
+ }
+ } finally {
+
+ inReceive.set(false);
+ }
+ }
+
+ public void oneway(Object o) throws IOException {
+ // Disable inactivity monitoring while processing a command.
+ //synchronize this method - its not synchronized
+ //further down the transport stack and gets called by more
+ //than one thread by this class
+ synchronized(inSend) {
+ inSend.set(true);
+ try {
+
+ if( failed.get() ) {
+ throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
+ }
+ if (o.getClass() == WireFormatInfo.class) {
+ synchronized (this) {
+ localWireFormatInfo = (WireFormatInfo)o;
+ startMonitorThreads();
+ }
+ }
+ next.oneway(o);
+ } finally {
+ commandSent.set(true);
+ inSend.set(false);
+ }
+ }
+ }
+
+ public void onException(IOException error) {
+ if (failed.compareAndSet(false, true)) {
+ stopMonitorThreads();
+ transportListener.onException(error);
+ }
+ }
+
+ private synchronized void startMonitorThreads() throws IOException {
+ if (monitorStarted.get()) {
+ return;
+ }
+ if (localWireFormatInfo == null) {
+ return;
+ }
+ if (remoteWireFormatInfo == null) {
+ return;
+ }
+
+ readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+ initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+ if (readCheckTime > 0) {
+ monitorStarted.set(true);
+ writeCheckerTask = new SchedulerTimerTask(writeChecker);
+ readCheckerTask = new SchedulerTimerTask(readChecker);
+ writeCheckTime = readCheckTime/3;
+ synchronized( InactivityMonitor.class ) {
+ if( CHECKER_COUNTER == 0 ) {
+ READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
+ WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
+ }
+ CHECKER_COUNTER++;
+ WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
+ READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private synchronized void stopMonitorThreads() {
+ if (monitorStarted.compareAndSet(true, false)) {
+ readCheckerTask.cancel();
+ writeCheckerTask.cancel();
+ synchronized( InactivityMonitor.class ) {
+ WRITE_CHECK_TIMER.purge();
+ READ_CHECK_TIMER.purge();
+ CHECKER_COUNTER--;
+ if(CHECKER_COUNTER==0) {
+ WRITE_CHECK_TIMER.cancel();
+ READ_CHECK_TIMER.cancel();
+ WRITE_CHECK_TIMER = null;
+ READ_CHECK_TIMER = null;
+ }
+ }
+ }
+ }
+
+
+ static {
+ ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+* A holder for different thread priorites used in ActiveMQ
+*
+* @version $Revision: 1.9 $
+*/
+
+public interface ThreadPriorities {
+ int INBOUND_BROKER_CONNECTION = 6;
+ int OUT_BOUND_BROKER_DISPATCH = 6;
+ int INBOUND_CLIENT_CONNECTION = 7;
+ int INBOUND_CLIENT_SESSION = 7;
+ int BROKER_MANAGEMENT = 9;
+}
Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * This is exception is thrown when the transport layer detects that the underlying socket has been inactive for
+ * too long.
+ *
+ * @version $Revision$
+ */
+public class InactivityIOException extends IOException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 5816001466763503220L;
+
+ public InactivityIOException() {
+ super();
+ }
+
+ /**
+ * @param message
+ */
+ public InactivityIOException(String message) {
+ super(message);
+ }
+
+
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+
+import org.apache.activemq.util.ServiceSupport;
+
+/**
+ * A useful base class for implementations of {@link TransportServer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
+
+ private URI connectURI;
+ private URI bindLocation;
+ private TransportAcceptListener acceptListener;
+
+ public TransportServerSupport() {
+ }
+
+ public TransportServerSupport(URI location) {
+ this.connectURI = location;
+ this.bindLocation = location;
+ }
+
+ /**
+ * @return Returns the acceptListener.
+ */
+ public TransportAcceptListener getAcceptListener() {
+ return acceptListener;
+ }
+
+ /**
+ * Registers an accept listener
+ *
+ * @param acceptListener
+ */
+ public void setAcceptListener(TransportAcceptListener acceptListener) {
+ this.acceptListener = acceptListener;
+ }
+
+ /**
+ * @return Returns the location.
+ */
+ public URI getConnectURI() {
+ return connectURI;
+ }
+
+ /**
+ * @param location The location to set.
+ */
+ public void setConnectURI(URI location) {
+ this.connectURI = location;
+ }
+
+ protected void onAcceptError(Exception e) {
+ if (acceptListener != null) {
+ acceptListener.onAcceptError(e);
+ }
+ }
+
+ public URI getBindLocation() {
+ return bindLocation;
+ }
+
+ public void setBindLocation(URI bindLocation) {
+ this.bindLocation = bindLocation;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+
+import org.apache.activemq.ThreadPriorities;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A useful base class for implementations of {@link TransportServer} which uses
+ * a background thread to accept new connections.
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable {
+ private static final Log LOG = LogFactory.getLog(TransportServerThreadSupport.class);
+
+ private boolean daemon = true;
+ private boolean joinOnStop = true;
+ private Thread runner;
+ // should be a multiple of 128k
+ private long stackSize;
+
+ public TransportServerThreadSupport() {
+ }
+
+ public TransportServerThreadSupport(URI location) {
+ super(location);
+ }
+
+ public boolean isDaemon() {
+ return daemon;
+ }
+
+ /**
+ * Sets whether the background read thread is a daemon thread or not
+ */
+ public void setDaemon(boolean daemon) {
+ this.daemon = daemon;
+ }
+
+ public boolean isJoinOnStop() {
+ return joinOnStop;
+ }
+
+ /**
+ * Sets whether the background read thread is joined with (waited for) on a
+ * stop
+ */
+ public void setJoinOnStop(boolean joinOnStop) {
+ this.joinOnStop = joinOnStop;
+ }
+
+ protected void doStart() throws Exception {
+ LOG.info("Listening for connections at: " + getConnectURI());
+ runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
+ runner.setDaemon(daemon);
+ runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
+ runner.start();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ if (runner != null && joinOnStop) {
+ runner.join();
+ runner = null;
+ }
+ }
+
+ /**
+ * @return the stackSize
+ */
+ public long getStackSize() {
+ return this.stackSize;
+ }
+
+ /**
+ * @param stackSize the stackSize to set
+ */
+ public void setStackSize(long stackSize) {
+ this.stackSize = stackSize;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A useful base class for transport implementations.
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportSupport extends ServiceSupport implements Transport {
+ private static final Log LOG = LogFactory.getLog(TransportSupport.class);
+
+ TransportListener transportListener;
+
+ /**
+ * Returns the current transport listener
+ */
+ public TransportListener getTransportListener() {
+ return transportListener;
+ }
+
+ /**
+ * Registers an inbound command listener
+ *
+ * @param commandListener
+ */
+ public void setTransportListener(TransportListener commandListener) {
+ this.transportListener = commandListener;
+ }
+
+ /**
+ * narrow acceptance
+ *
+ * @param target
+ * @return 'this' if assignable
+ */
+ public <T> T narrow(Class<T> target) {
+ boolean assignableFrom = target.isAssignableFrom(getClass());
+ if (assignableFrom) {
+ return target.cast(this);
+ }
+ return null;
+ }
+
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
+
+ public Object request(Object command) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
+
+ public Object request(Object command, int timeout) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
+
+ /**
+ * Process the inbound command
+ */
+ public void doConsume(Object command) {
+ if (command != null) {
+ if (transportListener != null) {
+ transportListener.onCommand(command);
+ } else {
+ LOG.error("No transportListener available to process inbound command: " + command);
+ }
+ }
+ }
+
+ /**
+ * Passes any IO exceptions into the transport listener
+ */
+ public void onException(IOException e) {
+ if (transportListener != null) {
+ transportListener.onException(e);
+ }
+ }
+
+ protected void checkStarted() throws IOException {
+ if (!isStarted()) {
+ throw new IOException("The transport is not running.");
+ }
+ }
+
+ public boolean isFaultTolerant() {
+ return false;
+ }
+
+
+ public void reconnect(URI uri) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ public boolean isDisposed() {
+ return isStopped();
+ }
+
+ public boolean isConnected() {
+ return isStarted();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * A useful base class for a transport implementation which has a background
+ * reading thread.
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class TransportThreadSupport extends TransportSupport implements Runnable {
+
+ private boolean daemon;
+ private Thread runner;
+ // should be a multiple of 128k
+ private long stackSize;
+
+ public boolean isDaemon() {
+ return daemon;
+ }
+
+ public void setDaemon(boolean daemon) {
+ this.daemon = daemon;
+ }
+
+ protected void doStart() throws Exception {
+ runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize);
+ runner.setDaemon(daemon);
+ runner.start();
+ }
+
+ /**
+ * @return the stackSize
+ */
+ public long getStackSize() {
+ return this.stackSize;
+ }
+
+ /**
+ * @param stackSize the stackSize to set
+ */
+ public void setStackSize(long stackSize) {
+ this.stackSize = stackSize;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Simple BitArray to enable setting multiple boolean values efficently Used
+ * instead of BitSet because BitSet does not allow for efficent serialization.
+ * Will store up to 64 boolean values
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArray {
+ static final int LONG_SIZE = 64;
+ static final int INT_SIZE = 32;
+ static final int SHORT_SIZE = 16;
+ static final int BYTE_SIZE = 8;
+ private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
+ 0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L,
+ 0x0000000000000040L, 0x0000000000000080L, 0x0000000000000100L,
+ 0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L,
+ 0x0000000000001000L, 0x0000000000002000L, 0x0000000000004000L,
+ 0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L,
+ 0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L,
+ 0x0000000000200000L, 0x0000000000400000L, 0x0000000000800000L,
+ 0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L,
+ 0x0000000008000000L, 0x0000000010000000L, 0x0000000020000000L,
+ 0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L,
+ 0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L,
+ 0x0000001000000000L, 0x0000002000000000L, 0x0000004000000000L,
+ 0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L,
+ 0x0000040000000000L, 0x0000080000000000L, 0x0000100000000000L,
+ 0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L,
+ 0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L,
+ 0x0008000000000000L, 0x0010000000000000L, 0x0020000000000000L,
+ 0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L,
+ 0x0200000000000000L, 0x0400000000000000L, 0x0800000000000000L,
+ 0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L,
+ 0x8000000000000000L};
+ private long bits;
+ private int length;
+
+ /**
+ * @return the length of bits set
+ */
+ public int length() {
+ return length;
+ }
+
+ /**
+ * @return the long containing the bits
+ */
+ public long getBits() {
+ return bits;
+ }
+
+ /**
+ * set the boolean value at the index
+ *
+ * @param index
+ * @param flag
+ * @return the old value held at this index
+ */
+ public boolean set(int index, boolean flag) {
+ length = Math.max(length, index + 1);
+ boolean oldValue = (bits & BIT_VALUES[index]) != 0;
+ if (flag) {
+ bits |= BIT_VALUES[index];
+ } else if (oldValue) {
+ bits &= ~(BIT_VALUES[index]);
+ }
+ return oldValue;
+ }
+
+ /**
+ * @param index
+ * @return the boolean value at this index
+ */
+ public boolean get(int index) {
+ return (bits & BIT_VALUES[index]) != 0;
+ }
+
+ /**
+ * reset all the bit values to false
+ */
+ public void reset() {
+ bits = 0;
+ }
+
+ /**
+ * reset all the bits to the value supplied
+ *
+ * @param bits
+ */
+ public void reset(long bits) {
+ this.bits = bits;
+ }
+
+ /**
+ * write the bits to an output stream
+ *
+ * @param dataOut
+ * @throws IOException
+ */
+ public void writeToStream(DataOutput dataOut) throws IOException {
+ dataOut.writeByte(length);
+ if (length <= BYTE_SIZE) {
+ dataOut.writeByte((int)bits);
+ } else if (length <= SHORT_SIZE) {
+ dataOut.writeShort((short)bits);
+ } else if (length <= INT_SIZE) {
+ dataOut.writeInt((int)bits);
+ } else {
+ dataOut.writeLong(bits);
+ }
+ }
+
+ /**
+ * read the bits from an input stream
+ *
+ * @param dataIn
+ * @throws IOException
+ */
+ public void readFromStream(DataInput dataIn) throws IOException {
+ length = dataIn.readByte();
+ if (length <= BYTE_SIZE) {
+ bits = dataIn.readByte();
+ } else if (length <= SHORT_SIZE) {
+ bits = dataIn.readShort();
+ } else if (length <= INT_SIZE) {
+ bits = dataIn.readInt();
+ } else {
+ bits = dataIn.readLong();
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.LinkedList;
+
+/**
+ * Holder for many bitArrays - used for message audit
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArrayBin {
+
+ private LinkedList<BitArray> list;
+ private int maxNumberOfArrays;
+ private int firstIndex = -1;
+ private long lastInOrderBit=-1;
+
+ /**
+ * Create a BitArrayBin to a certain window size (number of messages to
+ * keep)
+ *
+ * @param windowSize
+ */
+ public BitArrayBin(int windowSize) {
+ maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
+ maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
+ list = new LinkedList<BitArray>();
+ for (int i = 0; i < maxNumberOfArrays; i++) {
+ list.add(null);
+ }
+ }
+
+ /**
+ * Set a bit
+ *
+ * @param index
+ * @param value
+ * @return true if set
+ */
+ public boolean setBit(long index, boolean value) {
+ boolean answer = false;
+ BitArray ba = getBitArray(index);
+ if (ba != null) {
+ int offset = getOffset(index);
+ if (offset >= 0) {
+ answer = ba.set(offset, value);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Test if in order
+ * @param index
+ * @return true if next message is in order
+ */
+ public boolean isInOrder(long index) {
+ boolean result = false;
+ if (lastInOrderBit == -1) {
+ result = true;
+ } else {
+ result = lastInOrderBit + 1 == index;
+ }
+ lastInOrderBit = index;
+ return result;
+
+ }
+
+ /**
+ * Get the boolean value at the index
+ *
+ * @param index
+ * @return true/false
+ */
+ public boolean getBit(long index) {
+ boolean answer = index >= firstIndex;
+ BitArray ba = getBitArray(index);
+ if (ba != null) {
+ int offset = getOffset(index);
+ if (offset >= 0) {
+ answer = ba.get(offset);
+ return answer;
+ }
+ } else {
+ // gone passed range for previous bins so assume set
+ answer = true;
+ }
+ return answer;
+ }
+
+ /**
+ * Get the BitArray for the index
+ *
+ * @param index
+ * @return BitArray
+ */
+ private BitArray getBitArray(long index) {
+ int bin = getBin(index);
+ BitArray answer = null;
+ if (bin >= 0) {
+ if (bin >= maxNumberOfArrays) {
+ int overShoot = bin - maxNumberOfArrays + 1;
+ while (overShoot > 0) {
+ list.removeFirst();
+ firstIndex += BitArray.LONG_SIZE;
+ list.add(new BitArray());
+ overShoot--;
+ }
+
+ bin = maxNumberOfArrays - 1;
+ }
+ answer = list.get(bin);
+ if (answer == null) {
+ answer = new BitArray();
+ list.set(bin, answer);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Get the index of the bin from the total index
+ *
+ * @param index
+ * @return the index of the bin
+ */
+ private int getBin(long index) {
+ int answer = 0;
+ if (firstIndex < 0) {
+ firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
+ } else if (firstIndex >= 0) {
+ answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
+ }
+ return answer;
+ }
+
+ /**
+ * Get the offset into a bin from the total index
+ *
+ * @param index
+ * @return the relative offset into a bin
+ */
+ private int getOffset(long index) {
+ int answer = 0;
+ if (firstIndex >= 0) {
+ answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
+ }
+ return answer;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java Tue Jun 2 21:29:30 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.PrintWriter;
+
+/**
+ * A helper class for printing indented text
+ *
+ * @version $Revision: 1.2 $
+ */
+public class IndentPrinter {
+
+ private int indentLevel;
+ private String indent;
+ private PrintWriter out;
+
+ public IndentPrinter() {
+ this(new PrintWriter(System.out), " ");
+ }
+
+ public IndentPrinter(PrintWriter out) {
+ this(out, " ");
+ }
+
+ public IndentPrinter(PrintWriter out, String indent) {
+ this.out = out;
+ this.indent = indent;
+ }
+
+ public void println(Object value) {
+ out.print(value.toString());
+ out.println();
+ }
+
+ public void println(String text) {
+ out.print(text);
+ out.println();
+ }
+
+ public void print(String text) {
+ out.print(text);
+ }
+
+ public void printIndent() {
+ for (int i = 0; i < indentLevel; i++) {
+ out.print(indent);
+ }
+ }
+
+ public void println() {
+ out.println();
+ }
+
+ public void incrementIndent() {
+ ++indentLevel;
+ }
+
+ public void decrementIndent() {
+ --indentLevel;
+ }
+
+ public int getIndentLevel() {
+ return indentLevel;
+ }
+
+ public void setIndentLevel(int indentLevel) {
+ this.indentLevel = indentLevel;
+ }
+
+ public void flush() {
+ out.flush();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java Tue Jun 2 21:29:30 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.util;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
@@ -86,4 +87,11 @@
exception.initCause(cause);
return exception;
}
+
+ public static InvalidSelectorException createInvalidSelectorException(Exception e) {
+ InvalidSelectorException se = new InvalidSelectorException(e.getMessage());
+ se.initCause(e);
+ return se;
+ }
+
}