You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/01/11 12:57:11 UTC

activemq git commit: [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

Repository: activemq
Updated Branches:
  refs/heads/master 4535e8f09 -> ec6fa1909


[AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ec6fa190
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ec6fa190
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ec6fa190

Branch: refs/heads/master
Commit: ec6fa190999160676cab900038b268b2d40a4d5c
Parents: 4535e8f
Author: gtully <ga...@gmail.com>
Authored: Thu Jan 11 12:56:40 2018 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Jan 11 12:56:40 2018 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  4 +-
 .../store/kahadb/disk/journal/Location.java     | 13 +--
 .../DataFileAppenderNoSpaceNoBatchTest.java     |  2 +-
 .../org/apache/activemq/bugs/AMQ6815Test.java   | 95 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index b391de7..94de6ea 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2132,8 +2132,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
             try {
                 location.getLatch().await();
-                if (location.getBatch().exception.get() != null) {
-                    throw location.getBatch().exception.get();
+                if (location.getException().get() != null) {
+                    throw location.getException().get();
                 }
             } catch (InterruptedException e) {
                 throw new InterruptedIOException(e.toString());

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
index f3da47a..673d9f6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Used as a location in the data store.
@@ -36,7 +37,8 @@ public final class Location implements Comparable<Location> {
     private int offset = NOT_SET;
     private int size = NOT_SET;
     private byte type = NOT_SET_TYPE;
-    private DataFileAppender.WriteBatch batch;
+    private CountDownLatch latch;
+    private AtomicReference<IOException> exception;
 
     public Location() {
     }
@@ -114,11 +116,12 @@ public final class Location implements Comparable<Location> {
     }
 
     public CountDownLatch getLatch() {
-        return batch.latch;
+        return latch;
     }
 
     public void setBatch(DataFileAppender.WriteBatch batch) {
-        this.batch = batch;
+        this.latch = batch.latch;
+        this.exception = batch.exception;
     }
 
     public int compareTo(Location o) {
@@ -142,7 +145,7 @@ public final class Location implements Comparable<Location> {
         return dataFileId ^ offset;
     }
 
-    public DataFileAppender.WriteBatch getBatch() {
-        return batch;
+    public AtomicReference<IOException> getException() {
+        return exception;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index a6b19ee..6d778c3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -183,7 +183,7 @@ public class DataFileAppenderNoSpaceNoBatchTest {
 
         boolean someExceptions = false;
         for (Location location: locations) {
-            someExceptions |= (location.getBatch().exception != null);
+            someExceptions |= (location.getException().get() != null);
         }
         assertTrue(someExceptions);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
new file mode 100644
index 0000000..0b41195
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class AMQ6815Test {
+   static final Logger LOG = LoggerFactory.getLogger(AMQ6815Test.class);
+   private final static int MEM_LIMIT = 5*1024*1024;
+   private final static byte[] payload = new byte[5*1024];
+
+      protected BrokerService brokerService;
+      protected Connection connection;
+      protected Session session;
+      protected Queue amqDestination;
+
+      @Before
+      public void setUp() throws Exception {
+         brokerService = new BrokerService();
+         PolicyEntry policy = new PolicyEntry();
+         policy.setMemoryLimit(MEM_LIMIT);
+         PolicyMap pMap = new PolicyMap();
+         pMap.setDefaultEntry(policy);
+         brokerService.setDestinationPolicy(pMap);
+
+         brokerService.start();
+         connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
+         connection.start();
+         session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+         amqDestination = session.createQueue("QQ");
+      }
+
+      @After
+      public void tearDown() throws Exception {
+         if (connection != null) {
+            connection.close();
+         }
+         brokerService.stop();
+      }
+
+      @Test(timeout = 60000)
+      public void testHeapUsage() throws Exception {
+         Runtime.getRuntime().gc();
+         final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+         sendMessages(10000);
+         Runtime.getRuntime().gc();
+         long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - initUsedMemory;
+         LOG.info("Mem in use: " + usedMem/1024  + "K");
+         assertTrue("Used Mem reasonable " + usedMem, usedMem < 5*MEM_LIMIT);
+      }
+
+      protected void sendMessages(int count) throws JMSException {
+         MessageProducer producer = session.createProducer(amqDestination);
+         for (int i = 0; i < count; i++) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(payload);
+            producer.send(bytesMessage);
+         }
+         producer.close();
+      }
+
+}