You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/12 22:29:31 UTC

svn commit: r1157238 [1/3] - in /activemq/trunk: activemq-core/ activemq-core/src/main/filtered-resources/ activemq-core/src/main/filtered-resources/org/ activemq-core/src/main/filtered-resources/org/apache/ activemq-core/src/main/filtered-resources/or...

Author: tabish
Date: Fri Aug 12 20:29:29 2011
New Revision: 1157238

URL: http://svn.apache.org/viewvc?rev=1157238&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3449

Added:
    activemq/trunk/activemq-core/src/main/filtered-resources/
    activemq/trunk/activemq-core/src/main/filtered-resources/org/
    activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/
    activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/
    activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Aug 12 20:29:29 2011
@@ -6,9 +6,9 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
   http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -98,7 +98,7 @@
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>
     </dependency>
-    
+
 
     <dependency>
       <groupId>org.apache.activemq</groupId>
@@ -133,7 +133,7 @@
       <groupId>com.thoughtworks.xstream</groupId>
       <artifactId>xstream</artifactId>
       <optional>true</optional>
-    </dependency>        
+    </dependency>
     <dependency>
         <groupId>org.codehaus.jettison</groupId>
         <artifactId>jettison</artifactId>
@@ -150,7 +150,7 @@
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-context</artifactId>
-    </dependency>    
+    </dependency>
 
     <dependency>
       <groupId>org.apache.derby</groupId>
@@ -213,7 +213,7 @@
       <artifactId>spring-test</artifactId>
       <scope>test</scope>
     </dependency>
-    
+
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -224,7 +224,7 @@
       <artifactId>activemq-jmdns_1.0</artifactId>
       <optional>true</optional>
     </dependency>
-    
+
     <dependency>
       <groupId>org.jasypt</groupId>
       <artifactId>jasypt</artifactId>
@@ -372,8 +372,25 @@
       </plugin>
     </plugins>
     </reporting>
-    
+
   <build>
+
+    <resources>
+      <resource>
+        <directory>${project.basedir}/src/main/resources</directory>
+        <includes>
+          <include>**/*</include>
+        </includes>
+      </resource>
+      <resource>
+        <directory>${project.basedir}/src/main/filtered-resources</directory>
+        <filtering>true</filtering>
+        <includes>
+          <include>**/*</include>
+        </includes>
+      </resource>
+    </resources>
+
     <plugins>
       <plugin>
         <groupId>org.apache.felix</groupId>
@@ -384,7 +401,7 @@
           </instructions>
         </configuration>
       </plugin>
-      
+
       <!-- Configure which tests are included/excuded -->
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
@@ -405,7 +422,7 @@
                  Note: if you want to see log messages on the console window remove
                        "redirectTestOutputToFile" from the parent pom
             -->
-            <!--           
+            <!--
             <property>
               <name>log4j.configuration</name>
               <value>file:target/test-classes/log4j.properties</value>
@@ -426,7 +443,7 @@
 
             <!-- These are performance tests so take too long to run -->
             <exclude>**/perf/*</exclude>
-            
+
             <!-- These are load tests so take too long to run -->
             <exclude>**/load/*</exclude>
 
@@ -470,19 +487,19 @@
 
              <!-- A test used for memory profiling only. -->
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
-             
+
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
-             
+
              <!-- used just to test potential memory leaks manually -->
              <exclude>**/JDBCTestMemory.*</exclude>
-             
+
              <exclude>**/amq1490/*</exclude>
              <exclude>**/archive/*</exclude>
              <exclude>**/NetworkFailoverTest.*/**</exclude>
-             
+
              <exclude>**/vm/VMTransportBrokerTest.*</exclude>
              <exclude>**/broker/MarshallingBrokerTest.*</exclude>
-             
+
 
             <exclude>**/AMQDeadlockTest3.*</exclude>
 
@@ -491,7 +508,7 @@
 
             <!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
             <exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
-            
+
           </excludes>
         </configuration>
       </plugin>
@@ -519,28 +536,28 @@
           </filesets>
         </configuration>
       </plugin>
-      
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
 
         <executions>
-	        <execution>
+            <execution>
             <id>package</id>
             <phase>package</phase>
             <configuration>
               <tasks>
                 <echo>Deleting unwanted resources from the test-jar</echo>
                 <delete dir="${project.build.directory}/test-classes" verbose="true">
-									<include name="*.*" />
-								</delete>
+                                    <include name="*.*" />
+                                </delete>
               </tasks>
             </configuration>
             <goals>
               <goal>run</goal>
             </goals>
           </execution>
-  
+
           <execution>
             <id>site</id>
             <phase>site</phase>
@@ -626,7 +643,7 @@
         <artifactId>cobertura-maven-plugin</artifactId>
         <version>2.0</version>
         <configuration>
-	  <check>
+      <check>
             <branchRate>50</branchRate>
             <lineRate>50</lineRate>
             <haltOnFailure>true</haltOnFailure>
@@ -634,12 +651,12 @@
             <totalLineRate>50</totalLineRate>
           </check>
         </configuration>
-      </plugin>      
+      </plugin>
     </plugins>
   </build>
 
   <profiles>
-    
+
     <profile>
       <id>openwire-generate</id>
       <dependencies>
@@ -675,7 +692,7 @@
         </plugins>
       </build>
     </profile>
-    
+
   </profiles>
 
 </project>

Added: activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt (added)
+++ activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt Fri Aug 12 20:29:29 2011
@@ -0,0 +1 @@
+${project.version}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/filtered-resources/org/apache/activemq/version.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Fri Aug 12 20:29:29 2011
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ */
+public abstract class AbstractInactivityMonitor extends TransportFilter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
+
+    private static ThreadPoolExecutor ASYNC_TASKS;
+    private static int CHECKER_COUNTER;
+    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
+    private static Timer READ_CHECK_TIMER;
+    private static Timer WRITE_CHECK_TIMER;
+
+    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandSent = new AtomicBoolean(false);
+    private final AtomicBoolean inSend = new AtomicBoolean(false);
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
+    private final AtomicBoolean inReceive = new AtomicBoolean(false);
+    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
+
+    private SchedulerTimerTask writeCheckerTask;
+    private SchedulerTimerTask readCheckerTask;
+
+    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
+    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
+    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
+    private boolean useKeepAlive = true;
+    private boolean keepAliveResponseRequired;
+
+    protected WireFormat wireFormat;
+
+    private final Runnable readChecker = new Runnable() {
+        long lastRunTime;
+        public void run() {
+            long now = System.currentTimeMillis();
+            long elapsed = (now-lastRunTime);
+
+            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+                LOG.debug(""+elapsed+" ms elapsed since last read check.");
+            }
+
+            // Perhaps the timer executed a read check late.. and then executes
+            // the next read check on time which causes the time elapsed between
+            // read checks to be small..
+
+            // If less than 90% of the read check Time elapsed then abort this readcheck.
+            if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
+                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
+                return;
+            }
+
+            lastRunTime = now;
+            readCheck();
+        }
+    };
+
+    private boolean allowReadCheck(long elapsed) {
+        return elapsed > (readCheckTime * 9 / 10);
+    }
+
+    private final Runnable writeChecker = new Runnable() {
+        long lastRunTime;
+        public void run() {
+            long now = System.currentTimeMillis();
+            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
+                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
+
+            }
+            lastRunTime = now;
+            writeCheck();
+        }
+    };
+
+    public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
+        super(next);
+        this.wireFormat = wireFormat;
+    }
+
+    public void start() throws Exception {
+        next.start();
+        startMonitorThreads();
+    }
+
+    public void stop() throws Exception {
+        stopMonitorThreads();
+        next.stop();
+    }
+
+    final void writeCheck() {
+        if (inSend.get()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A send is in progress");
+            }
+            return;
+        }
+
+        if (!commandSent.get() && useKeepAlive) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
+            }
+            ASYNC_TASKS.execute(new Runnable() {
+                public void run() {
+                    if (monitorStarted.get()) {
+                        try {
+                            KeepAliveInfo info = new KeepAliveInfo();
+                            info.setResponseRequired(keepAliveResponseRequired);
+                            oneway(info);
+                        } catch (IOException e) {
+                            onException(e);
+                        }
+                    }
+                };
+            });
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(this + " message sent since last write check, resetting flag");
+            }
+        }
+
+        commandSent.set(false);
+    }
+
+    final void readCheck() {
+        int currentCounter = next.getReceiveCounter();
+        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+        if (inReceive.get() || currentCounter!=previousCounter ) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A receive is in progress");
+            }
+            return;
+        }
+        if (!commandReceived.get()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            }
+            ASYNC_TASKS.execute(new Runnable() {
+                public void run() {
+                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
+                };
+
+            });
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message received since last read check, resetting flag: ");
+            }
+        }
+        commandReceived.set(false);
+    }
+
+    protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
+    protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
+
+    public void onCommand(Object command) {
+        commandReceived.set(true);
+        inReceive.set(true);
+        try {
+            if (command.getClass() == KeepAliveInfo.class) {
+                KeepAliveInfo info = (KeepAliveInfo) command;
+                if (info.isResponseRequired()) {
+                    try {
+                        info.setResponseRequired(false);
+                        oneway(info);
+                    } catch (IOException e) {
+                        onException(e);
+                    }
+                }
+            } else {
+                if (command.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        try {
+                            processInboundWireFormatInfo((WireFormatInfo) command);
+                        } catch (IOException e) {
+                            onException(e);
+                        }
+                    }
+                }
+                synchronized (readChecker) {
+                    transportListener.onCommand(command);
+                }
+            }
+        } finally {
+
+            inReceive.set(false);
+        }
+    }
+
+    public void oneway(Object o) throws IOException {
+        // Disable inactivity monitoring while processing a command.
+        // synchronize this method - its not synchronized
+        // further down the transport stack and gets called by more
+        // than one thread  by this class
+        synchronized(inSend) {
+            inSend.set(true);
+            try {
+
+                if( failed.get() ) {
+                    throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
+                }
+                if (o.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        processOutboundWireFormatInfo((WireFormatInfo) o);
+                    }
+                }
+                next.oneway(o);
+            } finally {
+                commandSent.set(true);
+                inSend.set(false);
+            }
+        }
+    }
+
+    public void onException(IOException error) {
+        if (failed.compareAndSet(false, true)) {
+            stopMonitorThreads();
+            transportListener.onException(error);
+        }
+    }
+
+    public void setUseKeepAlive(boolean val) {
+        useKeepAlive = val;
+    }
+
+    public long getReadCheckTime() {
+        return readCheckTime;
+    }
+
+    public void setReadCheckTime(long readCheckTime) {
+        this.readCheckTime = readCheckTime;
+    }
+
+    public long getWriteCheckTime() {
+        return writeCheckTime;
+    }
+
+    public void setWriteCheckTime(long writeCheckTime) {
+        this.writeCheckTime = writeCheckTime;
+    }
+
+    public long getInitialDelayTime() {
+        return initialDelayTime;
+    }
+
+    public void setInitialDelayTime(long initialDelayTime) {
+        this.initialDelayTime = initialDelayTime;
+    }
+
+    public boolean isKeepAliveResponseRequired() {
+        return this.keepAliveResponseRequired;
+    }
+
+    public void setKeepAliveResponseRequired(boolean value) {
+        this.keepAliveResponseRequired = value;
+    }
+
+    public boolean isMonitorStarted() {
+        return this.monitorStarted.get();
+    }
+
+    protected synchronized void startMonitorThreads() throws IOException {
+        if (monitorStarted.get()) {
+            return;
+        }
+
+        if (!configuredOk()) {
+            return;
+        }
+
+        if (readCheckTime > 0) {
+            readCheckerTask = new SchedulerTimerTask(readChecker);
+        }
+
+        if (writeCheckTime > 0) {
+            writeCheckerTask = new SchedulerTimerTask(writeChecker);
+        }
+
+        if (writeCheckTime > 0 || readCheckTime > 0) {
+            monitorStarted.set(true);
+            synchronized(AbstractInactivityMonitor.class) {
+                if( CHECKER_COUNTER == 0 ) {
+                    ASYNC_TASKS = createExecutor();
+                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
+                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
+                }
+                CHECKER_COUNTER++;
+                if (readCheckTime > 0) {
+                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
+                }
+                if (writeCheckTime > 0) {
+                    WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
+                }
+            }
+        }
+    }
+
+    abstract protected boolean configuredOk() throws IOException;
+
+    protected synchronized void stopMonitorThreads() {
+        if (monitorStarted.compareAndSet(true, false)) {
+            if (readCheckerTask != null) {
+                readCheckerTask.cancel();
+            }
+            if (writeCheckerTask != null) {
+                writeCheckerTask.cancel();
+            }
+            synchronized( AbstractInactivityMonitor.class ) {
+                WRITE_CHECK_TIMER.purge();
+                READ_CHECK_TIMER.purge();
+                CHECKER_COUNTER--;
+                if(CHECKER_COUNTER==0) {
+                  WRITE_CHECK_TIMER.cancel();
+                  READ_CHECK_TIMER.cancel();
+                    WRITE_CHECK_TIMER = null;
+                    READ_CHECK_TIMER = null;
+                    ASYNC_TASKS.shutdownNow();
+                    ASYNC_TASKS = null;
+                }
+            }
+        }
+    }
+
+    private ThreadFactory factory = new ThreadFactory() {
+        public Thread newThread(Runnable runnable) {
+            Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
+            thread.setDaemon(true);
+            return thread;
+        }
+    };
+
+    private ThreadPoolExecutor createExecutor() {
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+        exec.allowCoreThreadTimeOut(true);
+        return exec;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Fri Aug 12 20:29:29 2011
@@ -17,17 +17,8 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
-import java.util.Timer;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.thread.SchedulerTimerTask;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,293 +26,59 @@ import org.slf4j.LoggerFactory;
 /**
  * Used to make sure that commands are arriving periodically from the peer of
  * the transport.
- *
- *
  */
-public class InactivityMonitor extends TransportFilter {
+public class InactivityMonitor extends AbstractInactivityMonitor {
 
     private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
-    private static ThreadPoolExecutor ASYNC_TASKS;
-    private static int CHECKER_COUNTER;
-    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
-    private static Timer  READ_CHECK_TIMER;
-    private static Timer  WRITE_CHECK_TIMER;
 
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
-    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
-
-    private final AtomicBoolean commandSent = new AtomicBoolean(false);
-    private final AtomicBoolean inSend = new AtomicBoolean(false);
-    private final AtomicBoolean failed = new AtomicBoolean(false);
-
-    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
-    private final AtomicBoolean inReceive = new AtomicBoolean(false);
-    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
-
-    private SchedulerTimerTask writeCheckerTask;
-    private SchedulerTimerTask readCheckerTask;
 
     private boolean ignoreRemoteWireFormat = false;
     private boolean ignoreAllWireFormatInfo = false;
 
-    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
-    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
-    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
-    private boolean useKeepAlive = true;
-    private boolean keepAliveResponseRequired;
-    private WireFormat wireFormat;
-
-    private final Runnable readChecker = new Runnable() {
-        long lastRunTime;
-        public void run() {
-            long now = System.currentTimeMillis();
-            long elapsed = (now-lastRunTime);
-
-            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
-                LOG.debug(""+elapsed+" ms elapsed since last read check.");
-            }
-
-            // Perhaps the timer executed a read check late.. and then executes
-            // the next read check on time which causes the time elapsed between
-            // read checks to be small..
-
-            // If less than 90% of the read check Time elapsed then abort this readcheck.
-            if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
-                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
-                return;
-            }
-
-            lastRunTime = now;
-            readCheck();
-        }
-    };
-
-    private boolean allowReadCheck(long elapsed) {
-        return elapsed > (readCheckTime * 9 / 10);
-    }
-
-    private final Runnable writeChecker = new Runnable() {
-        long lastRunTime;
-        public void run() {
-            long now = System.currentTimeMillis();
-            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
-                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
-
-            }
-            lastRunTime = now;
-            writeCheck();
-        }
-    };
-
     public InactivityMonitor(Transport next, WireFormat wireFormat) {
-        super(next);
-        this.wireFormat = wireFormat;
+        super(next, wireFormat);
         if (this.wireFormat == null) {
             this.ignoreAllWireFormatInfo = true;
         }
     }
 
-    public void start() throws Exception {
-        next.start();
-        startMonitorThreads();
-    }
-
-    public void stop() throws Exception {
-        stopMonitorThreads();
-        next.stop();
-    }
-
-    final void writeCheck() {
-        if (inSend.get()) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("A send is in progress");
-            }
-            return;
-        }
-
-        if (!commandSent.get() && useKeepAlive) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
-            }
-            ASYNC_TASKS.execute(new Runnable() {
-                public void run() {
-                    if (monitorStarted.get()) {
-                        try {
-
-                            KeepAliveInfo info = new KeepAliveInfo();
-                            info.setResponseRequired(keepAliveResponseRequired);
-                            oneway(info);
-                        } catch (IOException e) {
-                            onException(e);
-                        }
-                    }
-                };
-            });
-        } else {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace(this + " message sent since last write check, resetting flag");
-            }
-        }
-
-        commandSent.set(false);
-    }
-
-    final void readCheck() {
-        int currentCounter = next.getReceiveCounter();
-        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
-        if (inReceive.get() || currentCounter!=previousCounter ) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("A receive is in progress");
-            }
-            return;
-        }
-        if (!commandReceived.get()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
-            }
-            ASYNC_TASKS.execute(new Runnable() {
-                public void run() {
-                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
-                };
-
-            });
-        } else {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Message received since last read check, resetting flag: ");
-            }
-        }
-        commandReceived.set(false);
-    }
-
-    public void onCommand(Object command) {
-        commandReceived.set(true);
-        inReceive.set(true);
+    protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
+        IOException error = null;
+        remoteWireFormatInfo = info;
         try {
-            if (command.getClass() == KeepAliveInfo.class) {
-                KeepAliveInfo info = (KeepAliveInfo) command;
-                if (info.isResponseRequired()) {
-                    try {
-                        info.setResponseRequired(false);
-                        oneway(info);
-                    } catch (IOException e) {
-                        onException(e);
-                    }
-                }
-            } else {
-                if (command.getClass() == WireFormatInfo.class) {
-                    synchronized (this) {
-                        IOException error = null;
-                        remoteWireFormatInfo = (WireFormatInfo) command;
-                        try {
-                            startMonitorThreads();
-                        } catch (IOException e) {
-                            error = e;
-                        }
-                        if (error != null) {
-                            onException(error);
-                        }
-                    }
-                }
-                synchronized (readChecker) {
-                    transportListener.onCommand(command);
-                }
-            }
-        } finally {
-
-            inReceive.set(false);
+            startMonitorThreads();
+        } catch (IOException e) {
+            error = e;
         }
-    }
-
-    public void oneway(Object o) throws IOException {
-        // Disable inactivity monitoring while processing a command.
-        //synchronize this method - its not synchronized
-        //further down the transport stack and gets called by more
-        //than one thread  by this class
-        synchronized(inSend) {
-            inSend.set(true);
-            try {
-
-                if( failed.get() ) {
-                    throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
-                }
-                if (o.getClass() == WireFormatInfo.class) {
-                    synchronized (this) {
-                        localWireFormatInfo = (WireFormatInfo)o;
-                        startMonitorThreads();
-                    }
-                }
-                next.oneway(o);
-            } finally {
-                commandSent.set(true);
-                inSend.set(false);
-            }
+        if (error != null) {
+            onException(error);
         }
     }
 
-    public void onException(IOException error) {
-        if (failed.compareAndSet(false, true)) {
-            stopMonitorThreads();
-            transportListener.onException(error);
-        }
-    }
-
-    public void setKeepAliveResponseRequired(boolean val) {
-        keepAliveResponseRequired = val;
-    }
-
-    public void setUseKeepAlive(boolean val) {
-        useKeepAlive = val;
-    }
-
-    public void setIgnoreRemoteWireFormat(boolean val) {
-        ignoreRemoteWireFormat = val;
-    }
-
-    public long getReadCheckTime() {
-        return readCheckTime;
-    }
-
-    public void setReadCheckTime(long readCheckTime) {
-        this.readCheckTime = readCheckTime;
-    }
-
-    public long getInitialDelayTime() {
-        return initialDelayTime;
-    }
-
-    public void setInitialDelayTime(long initialDelayTime) {
-        this.initialDelayTime = initialDelayTime;
+    protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
+        localWireFormatInfo = info;
+        startMonitorThreads();
     }
 
-    private synchronized void startMonitorThreads() throws IOException {
-        if (monitorStarted.get()) {
+    @Override
+    protected synchronized void startMonitorThreads() throws IOException {
+        if (isMonitorStarted()) {
             return;
         }
 
-        if (!configuredOk()) {
-            return;
-        }
+        long readCheckTime = getReadCheckTime();
 
         if (readCheckTime > 0) {
-            monitorStarted.set(true);
-            writeCheckerTask = new SchedulerTimerTask(writeChecker);
-            readCheckerTask = new  SchedulerTimerTask(readChecker);
-            writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
-            synchronized( InactivityMonitor.class ) {
-                if( CHECKER_COUNTER == 0 ) {
-                    ASYNC_TASKS = createExecutor();
-                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
-                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
-                }
-                CHECKER_COUNTER++;
-                WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
-                READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
-            }
+            setWriteCheckTime(readCheckTime>3 ? readCheckTime/3 : readCheckTime);
         }
+
+        super.startMonitorThreads();
     }
 
-    private boolean configuredOk() throws IOException {
+    @Override
+    protected boolean configuredOk() throws IOException {
         boolean configured = false;
         if (ignoreAllWireFormatInfo) {
             configured = true;
@@ -330,54 +87,29 @@ public class InactivityMonitor extends T
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
                 }
-                readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
-                initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+
+                long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+                long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
+
+                setReadCheckTime(readCheckTime);
+                setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
+                setWriteCheckTime(writeCheckTime);
+
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Using local: " + localWireFormatInfo);
                 }
-                readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
-                initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
-            }
-            configured = true;
-        }
-        return configured;
-    }
 
-    /**
-     *
-     */
-    private synchronized void stopMonitorThreads() {
-        if (monitorStarted.compareAndSet(true, false)) {
-            readCheckerTask.cancel();
-            writeCheckerTask.cancel();
-            synchronized( InactivityMonitor.class ) {
-                WRITE_CHECK_TIMER.purge();
-                READ_CHECK_TIMER.purge();
-                CHECKER_COUNTER--;
-                if(CHECKER_COUNTER==0) {
-                  WRITE_CHECK_TIMER.cancel();
-                  READ_CHECK_TIMER.cancel();
-                    WRITE_CHECK_TIMER = null;
-                    READ_CHECK_TIMER = null;
-                    ASYNC_TASKS.shutdownNow();
-                    ASYNC_TASKS = null;
-                }
-            }
-        }
-    }
+                long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
+                long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
 
-    private ThreadFactory factory = new ThreadFactory() {
-        public Thread newThread(Runnable runnable) {
-            Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
-            thread.setDaemon(true);
-            return thread;
+                setReadCheckTime(readCheckTime);
+                setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
+                setWriteCheckTime(writeCheckTime);
+            }
+            configured = true;
         }
-    };
 
-    private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
-        exec.allowCoreThreadTimeOut(true);
-        return exec;
+        return configured;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Aug 12 20:29:29 2011
@@ -23,14 +23,12 @@ import org.apache.activemq.Service;
 /**
  * Represents the client side of a transport allowing messages to be sent
  * synchronously, asynchronously and consumed.
- * 
- * 
  */
 public interface Transport extends Service {
 
     /**
      * A one way asynchronous send
-     * 
+     *
      * @param command
      * @throws IOException
      */
@@ -40,7 +38,7 @@ public interface Transport extends Servi
      * An asynchronous request response where the Receipt will be returned in
      * the future. If responseCallback is not null, then it will be called when
      * the response has been completed.
-     * 
+     *
      * @param command
      * @param responseCallback TODO
      * @return the FutureResponse
@@ -50,7 +48,7 @@ public interface Transport extends Servi
 
     /**
      * A synchronous request response
-     * 
+     *
      * @param command
      * @return the response
      * @throws IOException
@@ -59,7 +57,7 @@ public interface Transport extends Servi
 
     /**
      * A synchronous request response
-     * 
+     *
      * @param command
      * @param timeout
      * @return the repsonse or null if timeout
@@ -67,53 +65,16 @@ public interface Transport extends Servi
      */
     Object request(Object command, int timeout) throws IOException;
 
-    // /**
-    // * A one way asynchronous send
-    // * @param command
-    // * @throws IOException
-    // */
-    // void oneway(Command command) throws IOException;
-    //
-    // /**
-    // * An asynchronous request response where the Receipt will be returned
-    // * in the future. If responseCallback is not null, then it will be called
-    // * when the response has been completed.
-    // *
-    // * @param command
-    // * @param responseCallback TODO
-    // * @return the FutureResponse
-    // * @throws IOException
-    // */
-    // FutureResponse asyncRequest(Command command, ResponseCallback
-    // responseCallback) throws IOException;
-    //    
-    // /**
-    // * A synchronous request response
-    // * @param command
-    // * @return the response
-    // * @throws IOException
-    // */
-    // Response request(Command command) throws IOException;
-    //
-    // /**
-    // * A synchronous request response
-    // * @param command
-    // * @param timeout
-    // * @return the repsonse or null if timeout
-    // * @throws IOException
-    // */
-    // Response request(Command command, int timeout) throws IOException;
-
     /**
      * Returns the current transport listener
-     * 
+     *
      * @return
      */
     TransportListener getTransportListener();
 
     /**
      * Registers an inbound command listener
-     * 
+     *
      * @param commandListener
      */
     void setTransportListener(TransportListener commandListener);
@@ -131,26 +92,26 @@ public interface Transport extends Servi
 
     /**
      * Indicates if the transport can handle faults
-     * 
+     *
      * @return true if fault tolerant
      */
     boolean isFaultTolerant();
-    
+
     /**
      * @return true if the transport is disposed
      */
     boolean isDisposed();
-    
+
     /**
      * @return true if the transport is connected
      */
     boolean isConnected();
-    
+
     /**
      * @return true if reconnect is supported
      */
     boolean isReconnectSupported();
-    
+
     /**
      * @return true if updating uris is supported
      */
@@ -161,10 +122,10 @@ public interface Transport extends Servi
      * @throws IOException on failure of if not supported
      */
     void reconnect(URI uri) throws IOException;
-    
+
     /**
      * Provide a list of available alternative locations
-     * @param rebalance 
+     * @param rebalance
      * @param uris
      * @throws IOException
      */
@@ -172,10 +133,10 @@ public interface Transport extends Servi
 
     /**
      * Returns a counter which gets incremented as data is read from the transport.
-     * It should only be used to determine if there is progress being made in reading the next command from the transport.  
-     * The value may wrap into the negative numbers. 
-     * 
+     * It should only be used to determine if there is progress being made in reading the next command from the transport.
+     * The value may wrap into the negative numbers.
+     *
      * @return a counter which gets incremented as data is read from the transport.
      */
-    int getReceiveCounter();    
+    int getReceiveCounter();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Fri Aug 12 20:29:29 2011
@@ -44,7 +44,7 @@ public abstract class TransportFactory {
 
     private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
     private static final String THREAD_NAME_FILTER = "threadName";
-    
+
     public abstract TransportServer doBind(URI location) throws IOException;
 
     public Transport doConnect(URI location, Executor ex) throws Exception {
@@ -57,7 +57,7 @@ public abstract class TransportFactory {
 
     /**
      * Creates a normal transport.
-     * 
+     *
      * @param location
      * @return the transport
      * @throws Exception
@@ -69,7 +69,7 @@ public abstract class TransportFactory {
 
     /**
      * Creates a normal transport.
-     * 
+     *
      * @param location
      * @param ex
      * @return the transport
@@ -83,7 +83,7 @@ public abstract class TransportFactory {
     /**
      * Creates a slimmed down transport that is more efficient so that it can be
      * used by composite transports like reliable and HA.
-     * 
+     *
      * @param location
      * @return the Transport
      * @throws Exception
@@ -96,7 +96,7 @@ public abstract class TransportFactory {
     /**
      * Creates a slimmed down transport that is more efficient so that it can be
      * used by composite transports like reliable and HA.
-     * 
+     *
      * @param location
      * @param ex
      * @return the Transport
@@ -113,12 +113,12 @@ public abstract class TransportFactory {
     }
 
     /**
-     * @deprecated 
+     * @deprecated
      */
     public static TransportServer bind(String brokerId, URI location) throws IOException {
         return bind(location);
     }
-    
+
     public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
         TransportFactory tf = findTransportFactory(location);
         if( brokerService!=null && tf instanceof BrokerServiceAware ) {
@@ -132,7 +132,7 @@ public abstract class TransportFactory {
         } finally {
             SslContext.setCurrentSslContext(null);
         }
-    }    
+    }
 
     public Transport doConnect(URI location) throws Exception {
         try {
@@ -164,7 +164,7 @@ public abstract class TransportFactory {
             throw IOExceptionSupport.create(e);
         }
     }
-    
+
      /**
       * Allow registration of a transport factory without wiring via META-INF classes
      * @param scheme
@@ -176,7 +176,7 @@ public abstract class TransportFactory {
 
     /**
      * Factory method to create a new transport
-     * 
+     *
      * @throws IOException
      * @throws UnknownHostException
      */
@@ -235,13 +235,14 @@ public abstract class TransportFactory {
     /**
      * Fully configures and adds all need transport filters so that the
      * transport can be used by the JMS client.
-     * 
+     *
      * @param transport
      * @param wf
      * @param options
      * @return
      * @throws Exception
      */
+    @SuppressWarnings("rawtypes")
     public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
         transport = compositeConfigure(transport, wf, options);
 
@@ -256,14 +257,15 @@ public abstract class TransportFactory {
      * transport can be used by the ActiveMQ message broker. The main difference
      * between this and the configure() method is that the broker does not issue
      * requests to the client so the ResponseCorrelator is not needed.
-     * 
+     *
      * @param transport
      * @param format
      * @param options
      * @return
      * @throws Exception
      */
-    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+    @SuppressWarnings("rawtypes")
+	public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
         if (options.containsKey(THREAD_NAME_FILTER)) {
             transport = new ThreadNameFilter(transport);
         }
@@ -276,12 +278,13 @@ public abstract class TransportFactory {
      * Similar to configure(...) but this avoid adding in the MutexTransport and
      * ResponseCorrelator transport layers so that the resulting transport can
      * more efficiently be used as part of a composite transport.
-     * 
+     *
      * @param transport
      * @param format
      * @param options
      * @return
      */
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
             transport = new WriteTimeoutFilter(transport);
@@ -294,6 +297,7 @@ public abstract class TransportFactory {
         return transport;
     }
 
+    @SuppressWarnings("rawtypes")
     protected String getOption(Map options, String key, String def) {
         String rc = (String) options.remove(key);
         if( rc == null ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Fri Aug 12 20:29:29 2011
@@ -22,10 +22,7 @@ import java.net.Socket;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
 import org.apache.activemq.transport.tcp.TimeStampStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,15 +36,15 @@ import org.slf4j.LoggerFactory;
  * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
  * For example (15 second timeout on write operations to the socket):</br>
  * <pre><code>
- * &lt;transportConnector 
- *     name=&quot;tcp1&quot; 
+ * &lt;transportConnector
+ *     name=&quot;tcp1&quot;
  *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
  * /&gt;
  * </code></pre><br/>
  * For example (enable default timeout on the socket):</br>
  * <pre><code>
- * &lt;transportConnector 
- *     name=&quot;tcp1&quot; 
+ * &lt;transportConnector
+ *     name=&quot;tcp1&quot;
  *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
  * /&gt;
  * </code></pre>
@@ -59,12 +56,12 @@ public class WriteTimeoutFilter extends 
     private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
     protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
     protected static AtomicInteger messageCounter = new AtomicInteger(0);
-    protected static TimeoutThread timeoutThread = new TimeoutThread(); 
-    
+    protected static TimeoutThread timeoutThread = new TimeoutThread();
+
     protected static long sleep = 5000l;
 
     protected long writeTimeout = -1;
-    
+
     public WriteTimeoutFilter(Transport next) {
         super(next);
     }
@@ -80,7 +77,7 @@ public class WriteTimeoutFilter extends 
             deRegisterWrite(this,false,null);
         }
     }
-    
+
     public long getWriteTimeout() {
         return writeTimeout;
     }
@@ -88,7 +85,7 @@ public class WriteTimeoutFilter extends 
     public void setWriteTimeout(long writeTimeout) {
         this.writeTimeout = writeTimeout;
     }
-    
+
     public static long getSleep() {
         return sleep;
     }
@@ -97,21 +94,21 @@ public class WriteTimeoutFilter extends 
         WriteTimeoutFilter.sleep = sleep;
     }
 
-    
+
     protected TimeStampStream getWriter() {
         return next.narrow(TimeStampStream.class);
     }
-    
+
     protected Socket getSocket() {
         return next.narrow(Socket.class);
     }
-    
+
     protected static void registerWrite(WriteTimeoutFilter filter) {
         writers.add(filter);
     }
-    
+
     protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
-        boolean result = writers.remove(filter); 
+        boolean result = writers.remove(filter);
         if (result) {
             if (fail) {
                 String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
@@ -129,17 +126,17 @@ public class WriteTimeoutFilter extends 
         }
         return result;
     }
-    
+
     @Override
     public void start() throws Exception {
         super.start();
     }
-    
+
     @Override
     public void stop() throws Exception {
         super.stop();
     }
-    
+
     protected static class TimeoutThread extends Thread {
         static AtomicInteger instance = new AtomicInteger(0);
         boolean run = true;
@@ -150,14 +147,14 @@ public class WriteTimeoutFilter extends 
             start();
         }
 
-        
+
         public void run() {
             while (run) {
-            	boolean error = false;
+                boolean error = false;
                 try {
-                	if (!interrupted()) {
-                		Iterator<WriteTimeoutFilter> filters = writers.iterator();
-                	    while (run && filters.hasNext()) { 
+                    if (!interrupted()) {
+                        Iterator<WriteTimeoutFilter> filters = writers.iterator();
+                        while (run && filters.hasNext()) {
                             WriteTimeoutFilter filter = filters.next();
                             if (filter.getWriteTimeout()<=0) continue; //no timeout set
                             long writeStart = filter.getWriter().getWriteTimestamp();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -22,12 +22,9 @@ import java.util.Map;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.Message;
 
 /**
  * Implementations of this interface are used to map back and forth from Stomp

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -34,8 +34,6 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.util.JettisonMappedXmlDriver;
 import org.codehaus.jettison.mapped.Configuration;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
 
 import com.thoughtworks.xstream.XStream;
 import com.thoughtworks.xstream.io.HierarchicalStreamReader;
@@ -50,206 +48,208 @@ import com.thoughtworks.xstream.io.xml.X
  * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
  */
 public class JmsFrameTranslator extends LegacyFrameTranslator implements
-		BrokerContextAware {
+        BrokerContextAware {
 
-	XStream xStream = null;
-	BrokerContext brokerContext;
+    XStream xStream = null;
+    BrokerContext brokerContext;
 
-	public ActiveMQMessage convertFrame(ProtocolConverter converter,
-			StompFrame command) throws JMSException, ProtocolException {
-		Map headers = command.getHeaders();
-		ActiveMQMessage msg;
-		String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
-		if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
-			msg = super.convertFrame(converter, command);
-		} else {
-			HierarchicalStreamReader in;
-
-			try {
-				String text = new String(command.getContent(), "UTF-8");
-				switch (Stomp.Transformations.getValue(transformation)) {
-				case JMS_OBJECT_XML:
-					in = new XppReader(new StringReader(text));
-					msg = createObjectMessage(in);
-					break;
-				case JMS_OBJECT_JSON:
-					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-					msg = createObjectMessage(in);
-					break;
-				case JMS_MAP_XML:
-					in = new XppReader(new StringReader(text));
-					msg = createMapMessage(in);
-					break;
-				case JMS_MAP_JSON:
-					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-					msg = createMapMessage(in);
-					break;
-				default:
-					throw new Exception("Unkown transformation: " + transformation);
-				}
-			} catch (Throwable e) {
-				command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
-				msg = super.convertFrame(converter, command);
-			}
-		}
-		FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
-		return msg;
-	}
-
-	public StompFrame convertMessage(ProtocolConverter converter,
-			ActiveMQMessage message) throws IOException, JMSException {
-		if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
-			StompFrame command = new StompFrame();
-			command.setAction(Stomp.Responses.MESSAGE);
-			Map<String, String> headers = new HashMap<String, String>(25);
-			command.setHeaders(headers);
+    public ActiveMQMessage convertFrame(ProtocolConverter converter,
+            StompFrame command) throws JMSException, ProtocolException {
+        Map<String, String> headers = command.getHeaders();
+        ActiveMQMessage msg;
+        String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+            msg = super.convertFrame(converter, command);
+        } else {
+            HierarchicalStreamReader in;
+
+            try {
+                String text = new String(command.getContent(), "UTF-8");
+                switch (Stomp.Transformations.getValue(transformation)) {
+                case JMS_OBJECT_XML:
+                    in = new XppReader(new StringReader(text));
+                    msg = createObjectMessage(in);
+                    break;
+                case JMS_OBJECT_JSON:
+                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+                    msg = createObjectMessage(in);
+                    break;
+                case JMS_MAP_XML:
+                    in = new XppReader(new StringReader(text));
+                    msg = createMapMessage(in);
+                    break;
+                case JMS_MAP_JSON:
+                    in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+                    msg = createMapMessage(in);
+                    break;
+                default:
+                    throw new Exception("Unkown transformation: " + transformation);
+                }
+            } catch (Throwable e) {
+                command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+                msg = super.convertFrame(converter, command);
+            }
+        }
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+        return msg;
+    }
 
-			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
+    public StompFrame convertMessage(ProtocolConverter converter,
+            ActiveMQMessage message) throws IOException, JMSException {
+        if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+            StompFrame command = new StompFrame();
+            command.setAction(Stomp.Responses.MESSAGE);
+            Map<String, String> headers = new HashMap<String, String>(25);
+            command.setHeaders(headers);
+
+            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+                    converter, message, command, this);
 
             if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
             } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
             }
 
             ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
-			command.setContent(marshall(msg.getObject(),
-					headers.get(Stomp.Headers.TRANSFORMATION))
-					.getBytes("UTF-8"));
-			return command;
-
-		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
-			StompFrame command = new StompFrame();
-			command.setAction(Stomp.Responses.MESSAGE);
-			Map<String, String> headers = new HashMap<String, String>(25);
-			command.setHeaders(headers);
+            command.setContent(marshall(msg.getObject(),
+                    headers.get(Stomp.Headers.TRANSFORMATION))
+                    .getBytes("UTF-8"));
+            return command;
+
+        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+            StompFrame command = new StompFrame();
+            command.setAction(Stomp.Responses.MESSAGE);
+            Map<String, String> headers = new HashMap<String, String>(25);
+            command.setHeaders(headers);
 
-			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
+            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+                    converter, message, command, this);
 
             if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
             } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
             }
 
-			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
-			command.setContent(marshall((Serializable)msg.getContentMap(),
-					headers.get(Stomp.Headers.TRANSFORMATION))
-					.getBytes("UTF-8"));
-			return command;
+            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+            command.setContent(marshall((Serializable)msg.getContentMap(),
+                    headers.get(Stomp.Headers.TRANSFORMATION))
+                    .getBytes("UTF-8"));
+            return command;
         } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
                 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
 
-			StompFrame command = new StompFrame();
-			command.setAction(Stomp.Responses.MESSAGE);
-			Map<String, String> headers = new HashMap<String, String>(25);
-			command.setHeaders(headers);
+            StompFrame command = new StompFrame();
+            command.setAction(Stomp.Responses.MESSAGE);
+            Map<String, String> headers = new HashMap<String, String>(25);
+            command.setHeaders(headers);
 
             FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
+                    converter, message, command, this);
 
             if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
             } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
-            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
             }
 
             String body = marshallAdvisory(message.getDataStructure(),
-            		headers.get(Stomp.Headers.TRANSFORMATION));
+                    headers.get(Stomp.Headers.TRANSFORMATION));
             command.setContent(body.getBytes("UTF-8"));
             return command;
-		} else {
-			return super.convertMessage(converter, message);
-		}
-	}
-
-	/**
-	 * Marshalls the Object to a string using XML or JSON encoding
-	 */
-	protected String marshall(Serializable object, String transformation)
-			throws JMSException {
-		StringWriter buffer = new StringWriter();
-		HierarchicalStreamWriter out;
-		if (transformation.toLowerCase().endsWith("json")) {
-			out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
-		} else {
-			out = new PrettyPrintWriter(buffer);
-		}
-		getXStream().marshal(object, out);
-		return buffer.toString();
-	}
-
-	protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
-		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
-		Object obj = getXStream().unmarshal(in);
-		objMsg.setObject((Serializable) obj);
-		return objMsg;
-	}
-
-	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
-		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
-		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
-		for (String key : map.keySet()) {
-			mapMsg.setObject(key, map.get(key));
-		}
-		return mapMsg;
-	}
+        } else {
+            return super.convertMessage(converter, message);
+        }
+    }
+
+    /**
+     * Marshalls the Object to a string using XML or JSON encoding
+     */
+    protected String marshall(Serializable object, String transformation)
+            throws JMSException {
+        StringWriter buffer = new StringWriter();
+        HierarchicalStreamWriter out;
+        if (transformation.toLowerCase().endsWith("json")) {
+            out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
+        } else {
+            out = new PrettyPrintWriter(buffer);
+        }
+        getXStream().marshal(object, out);
+        return buffer.toString();
+    }
+
+    protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+        ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+        Object obj = getXStream().unmarshal(in);
+        objMsg.setObject((Serializable) obj);
+        return objMsg;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+        ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+        Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+        for (String key : map.keySet()) {
+            mapMsg.setObject(key, map.get(key));
+        }
+        return mapMsg;
+    }
 
     protected String marshallAdvisory(final DataStructure ds, String transformation) {
 
-		StringWriter buffer = new StringWriter();
-		HierarchicalStreamWriter out;
-		if (transformation.toLowerCase().endsWith("json")) {
-			out = new JettisonMappedXmlDriver().createWriter(buffer);
-		} else {
-			out = new PrettyPrintWriter(buffer);
-		}
+        StringWriter buffer = new StringWriter();
+        HierarchicalStreamWriter out;
+        if (transformation.toLowerCase().endsWith("json")) {
+            out = new JettisonMappedXmlDriver().createWriter(buffer);
+        } else {
+            out = new PrettyPrintWriter(buffer);
+        }
 
-		XStream xstream = getXStream();
+        XStream xstream = getXStream();
         xstream.setMode(XStream.NO_REFERENCES);
         xstream.aliasPackage("", "org.apache.activemq.command");
-		xstream.marshal(ds, out);
-		return buffer.toString();
+        xstream.marshal(ds, out);
+        return buffer.toString();
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+    public XStream getXStream() {
+        if (xStream == null) {
+            xStream = createXStream();
+        }
+        return xStream;
+    }
+
+    public void setXStream(XStream xStream) {
+        this.xStream = xStream;
     }
 
-	// Properties
-	// -------------------------------------------------------------------------
-	public XStream getXStream() {
-		if (xStream == null) {
-			xStream = createXStream();
-		}
-		return xStream;
-	}
-
-	public void setXStream(XStream xStream) {
-		this.xStream = xStream;
-	}
-
-	// Implementation methods
-	// -------------------------------------------------------------------------
-	protected XStream createXStream() {
-		XStream xstream = null;
-		if (brokerContext != null) {
-			Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
-			for (XStream bean : beans.values()) {
-			    if (bean != null) {
-			        xstream = bean;
-			        break;
-			    }
-			}
-		}
-
-		if (xstream == null) {
-			xstream = new XStream();
-		}
-		return xstream;
-
-	}
-
-	public void setBrokerContext(BrokerContext brokerContext) {
-		this.brokerContext = brokerContext;
-	}
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    @SuppressWarnings("unchecked")
+    protected XStream createXStream() {
+        XStream xstream = null;
+        if (brokerContext != null) {
+            Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
+            for (XStream bean : beans.values()) {
+                if (bean != null) {
+                    xstream = bean;
+                    break;
+                }
+            }
+        }
+
+        if (xstream == null) {
+            xstream = new XStream();
+        }
+        return xstream;
+
+    }
+
+    public void setBrokerContext(BrokerContext brokerContext) {
+        this.brokerContext = brokerContext;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -23,10 +24,17 @@ import java.util.Map;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+
 import com.thoughtworks.xstream.XStream;
 import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.command.*;
 
 /**
  * Implements ActiveMQ 4.0 translations
@@ -35,7 +43,7 @@ public class LegacyFrameTranslator imple
 
 
     public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
-        final Map headers = command.getHeaders();
+        final Map<?, ?> headers = command.getHeaders();
         final ActiveMQMessage msg;
         /*
          * To reduce the complexity of this method perhaps a Chain of Responsibility
@@ -46,7 +54,12 @@ public class LegacyFrameTranslator imple
             if(intendedType.equalsIgnoreCase("text")){
                 ActiveMQTextMessage text = new ActiveMQTextMessage();
                 try {
-                    text.setText(new String(command.getContent(), "UTF-8"));
+                    //text.setText(new String(command.getContent(), "UTF-8"));
+                    ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
+                    DataOutputStream data = new DataOutputStream(bytes);
+                    data.writeInt(command.getContent().length);
+                    data.write(command.getContent());
+                    text.setContent(bytes.toByteSequence());
                 } catch (Throwable e) {
                     throw new ProtocolException("Text could not bet set: " + e, false, e);
                 }
@@ -66,7 +79,12 @@ public class LegacyFrameTranslator imple
         } else {
             ActiveMQTextMessage text = new ActiveMQTextMessage();
             try {
-                text.setText(new String(command.getContent(), "UTF-8"));
+                //text.setText(new String(command.getContent(), "UTF-8"));
+                ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
+                DataOutputStream data = new DataOutputStream(bytes);
+                data.writeInt(command.getContent().length);
+                data.write(command.getContent());
+                text.setContent(bytes.toByteSequence());
             } catch (Throwable e) {
                 throw new ProtocolException("Text could not bet set: " + e, false, e);
             }
@@ -86,8 +104,17 @@ public class LegacyFrameTranslator imple
 
         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
 
-            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
-            command.setContent(msg.getText().getBytes("UTF-8"));
+            if (!message.isCompressed() && message.getContent() != null) {
+                ByteSequence msgContent = message.getContent();
+                if (msgContent.getLength() > 4) {
+                    byte[] content = new byte[msgContent.getLength() - 4];
+                    System.arraycopy(msgContent.data, 4, content, 0, content.length);
+                    command.setContent(content);
+                }
+            } else {
+                ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+                command.setContent(msg.getText().getBytes("UTF-8"));
+            }
 
         } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
 
@@ -96,13 +123,13 @@ public class LegacyFrameTranslator imple
             byte[] data = new byte[(int)msg.getBodyLength()];
             msg.readBytes(data);
 
-            headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+            headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
             command.setContent(data);
         } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
                 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
 
             FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
+                    converter, message, command, this);
 
             String body = marshallAdvisory(message.getDataStructure());
             command.setContent(body.getBytes("UTF-8"));
@@ -119,10 +146,10 @@ public class LegacyFrameTranslator imple
 
         String rc = converter.getCreatedTempDestinationName(activeMQDestination);
         if( rc!=null ) {
-        	return rc;
+            return rc;
         }
 
-        StringBuffer buffer = new StringBuffer();
+        StringBuilder buffer = new StringBuilder();
         if (activeMQDestination.isQueue()) {
             if (activeMQDestination.isTemporary()) {
                 buffer.append("/remote-temp-queue/");