You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/01/30 19:20:56 UTC

git commit: Fixes APLO-342: Memory leaks when using openwire with transactions

Updated Branches:
  refs/heads/trunk 85d3413ff -> d12dbb322


Fixes APLO-342: Memory leaks when using openwire with transactions

Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/d12dbb32
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/d12dbb32
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/d12dbb32

Branch: refs/heads/trunk
Commit: d12dbb322122c07c77c47ae94ffd33f425895518
Parents: 85d3413
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 13:20:45 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 13:20:45 2014 -0500

----------------------------------------------------------------------
 .../openwire/OpenwireProtocolHandler.scala      |  2 ++
 .../apollo/openwire/test/TransactionTest.scala  | 28 ++++++++++++++++++++
 .../activemq/apollo/util/FunSuiteSupport.scala  |  8 ++++++
 3 files changed, 38 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
index cde434e..97aa182 100644
--- a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
+++ b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
@@ -587,11 +587,13 @@ class OpenwireProtocolHandler extends ProtocolHandler {
 
       case TransactionInfo.COMMIT_ONE_PHASE =>
         get_tx_ctx(id).commit {
+          remove_tx_ctx(id)
           ack(info)
         }
 
       case TransactionInfo.ROLLBACK =>
         get_tx_ctx(id).rollback
+        remove_tx_ctx(id)
         ack(info)
 
       case TransactionInfo.FORGET =>

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
----------------------------------------------------------------------
diff --git a/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala b/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
index 20fab9e..3a1fa71 100644
--- a/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
+++ b/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
@@ -128,4 +128,32 @@ class OpenwireLevelDBTransactionTest extends TransactionTest {
 //    disconnect() }
   }
 
+  ignore("APLO-342: Test memory usage"){
+    connect()
+    val dest = queue(next_id("example"))
+    val message_count = 1000000
+    val producer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = producer_session.createProducer(dest)
+    producer.setDeliveryMode(DeliveryMode.PERSISTENT)
+
+    val consumer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+    val consumer = consumer_session.createConsumer(dest)
+
+    for( i <- 1 to message_count) {
+      if( (i % (message_count/100)) == 0) {
+        println("On message: %d, jvm heap: %.2f".format(i, getJVMHeapUsage/(1024*1024.0)))
+      }
+      val x = producer_session.createTextMessage("x" * (1024*64))
+      x.setIntProperty("i", i)
+      producer.send(x)
+
+      val m = consumer.receive(1000).asInstanceOf[TextMessage]
+      m should not be (null)
+      m.getIntProperty("i") should be (i)
+      consumer_session.commit()
+    }
+  }
+
+
 }
+

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
----------------------------------------------------------------------
diff --git a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
index d3fa066..410f6d0 100644
--- a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
+++ b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
@@ -28,6 +28,9 @@ import scala.Some
 import org.apache.activemq.apollo.util.FunSuiteSupport._
 import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
 import java.util.concurrent.atomic.AtomicLong
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+import javax.management.openmbean.CompositeData
 
 object FunSuiteSupport {
   class SkipTestException extends RuntimeException
@@ -58,6 +61,11 @@ abstract class FunSuiteSupport extends FunSuite with Logging with ParallelBefore
 
   def skip(check:Boolean=true):Unit = if(check) throw new SkipTestException()
 
+  def getJVMHeapUsage = {
+    val mbean_server = ManagementFactory.getPlatformMBeanServer()
+    val data = mbean_server.getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage").asInstanceOf[CompositeData]
+    data.get("used").asInstanceOf[java.lang.Long].longValue()
+  }
 
   var _log:Log = null
   override protected def log: Log = {