You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/08/28 19:20:25 UTC

svn commit: r1518289 - in /activemq/trunk: activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/ activemq-unit-tests/src/test/java/org/apache/activemq/broker/

Author: chirino
Date: Wed Aug 28 17:20:25 2013
New Revision: 1518289

URL: http://svn.apache.org/r1518289
Log:
Adding a LevelDB version of the RedeliveryRestartTest. Implemented redelivery tracking in the leveldb store.

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java
Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1518289&r1=1518288&r2=1518289&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed Aug 28 17:20:25 2013
@@ -41,7 +41,7 @@ case class DataLocator(pos:Long, len:Int
 case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
   var locator:DataLocator = _
 }
-case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
+case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long, deliveries:Int=0)
 case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
 case class QueueEntryRange()
 case class SubAckRecord(subKey:Long, ackPosition:Long)
@@ -308,6 +308,26 @@ class DelayableUOW(val manager:DBManager
     countDownFuture
   }
 
+  def incrementRedelivery(expectedQueueKey:Long, id:MessageId) = {
+    if( id.getEntryLocator != null ) {
+      val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
+      assert(queueKey == expectedQueueKey)
+      val counter = manager.client.getDeliveryCounter(queueKey, queueSeq)
+      val entry = QueueEntryRecord(id, queueKey, queueSeq, counter+1)
+      val a = this.synchronized {
+        val action = getAction(entry.id)
+        action.enqueues += entry
+        delayableActions += 1
+        action
+      }
+      manager.dispatchQueue {
+        manager.cancelable_enqueue_actions.put(key(entry), a)
+        a.addToPendingStore()
+      }
+    }
+    countDownFuture
+  }
+
   def dequeue(expectedQueueKey:Long, id:MessageId) = {
     if( id.getEntryLocator != null ) {
       val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1518289&r1=1518288&r2=1518289&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Wed Aug 28 17:20:25 2013
@@ -85,6 +85,7 @@ object LevelDBClient extends Log {
   final val LOG_REMOVE_ENTRY        = 4.toByte
   final val LOG_DATA                = 5.toByte
   final val LOG_TRACE               = 6.toByte
+  final val LOG_UPDATE_ENTRY        = 7.toByte
 
   final val LOG_SUFFIX  = ".log"
   final val INDEX_SUFFIX  = ".index"
@@ -727,7 +728,7 @@ class LevelDBClient(store: LevelDBStore)
                     index.delete(data)
                     collectionMeta.remove(record.getKey)
 
-                  case LOG_ADD_ENTRY =>
+                  case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
                     val record = decodeEntryRecord(data)
 
                     val index_record = new EntryRecord.Bean()
@@ -737,10 +738,12 @@ class LevelDBClient(store: LevelDBStore)
 
                     index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
 
-                    if ( record.hasValueLocation ) {
-                      logRefIncrement(record.getValueLocation)
+                    if( kind==LOG_ADD_ENTRY ) {
+                      if ( record.hasValueLocation ) {
+                        logRefIncrement(record.getValueLocation)
+                      }
+                      collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
                     }
-                    collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
 
                   case LOG_REMOVE_ENTRY =>
                     val record = decodeEntryRecord(data)
@@ -1150,6 +1153,33 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  def decodeQueueEntryMeta(value:EntryRecord.Getter):Int= {
+    if( value.hasMeta ) {
+      val is = new DataByteArrayInputStream(value.getMeta);
+      val metaVersion = is.readVarInt()
+      metaVersion match {
+        case 1 =>
+          return is.readVarInt()
+        case _ =>
+      }
+    }
+    return 0
+  }
+
+  def getDeliveryCounter(collectionKey: Long, seq:Long):Int = {
+    val ro = new ReadOptions
+    ro.fillCache(true)
+    ro.verifyChecksums(verifyChecksums)
+    val key = encodeEntryKey(ENTRY_PREFIX, collectionKey, encodeLong(seq))
+    var rc = 0
+    might_fail_using_index {
+      for( v <- index.get(key, ro) ) {
+        rc = decodeQueueEntryMeta(EntryRecord.FACTORY.parseUnframed(v))
+      }
+    }
+    return rc
+  }
+
   def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
     collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
       val seq = decodeLong(key)
@@ -1157,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore)
       val msg = getMessage(locator)
       msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
       msg.getMessageId().setDataLocator(locator)
+      msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
       func(msg)
     }
   }
@@ -1351,20 +1382,32 @@ class LevelDBClient(store: LevelDBStore)
           log_record.setEntryKey(new Buffer(key, 9, 8))
           log_record.setValueLocation(dataLocator.pos)
           log_record.setValueLength(dataLocator.len)
-          appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+          val kind = if (entry.deliveries==0) LOG_ADD_ENTRY else LOG_UPDATE_ENTRY
+          appender.append(kind, encodeEntryRecord(log_record.freeze()))
 
           val index_record = new EntryRecord.Bean()
           index_record.setValueLocation(dataLocator.pos)
           index_record.setValueLength(dataLocator.len)
 
+          // Store the delivery counter.
+          if( entry.deliveries!=0 ) {
+            val os = new DataByteArrayOutputStream()
+            os.writeVarInt(1) // meta data format version
+            os.writeVarInt(entry.deliveries)
+            index_record.setMeta(os.toBuffer)
+          }
+
           val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
           batch.put(key, index_data)
 
-          for (key <- logRefKey(dataLocator.pos, log_info)) {
-            logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+          if( kind==LOG_ADD_ENTRY ) {
+            for (key <- logRefKey(dataLocator.pos, log_info)) {
+              logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+            }
+            collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
           }
 
-          collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
           write_enqueue_total += System.nanoTime() - start
         }
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1518289&r1=1518288&r2=1518289&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed Aug 28 17:20:25 2013
@@ -380,6 +380,7 @@ class LevelDBStore extends LockableServi
           if( prepared ) {
             store.preparedAcks.remove(ack.getLastMessageId)
           }
+          uow.incrementRedelivery(store.key, ack.getLastMessageId)
         }
       }
     }
@@ -452,16 +453,16 @@ class LevelDBStore extends LockableServi
       case null =>
         debug("on rollback, the transaction " + txid + " does not exist")
       case tx =>
-        if( tx.prepared ) {
-          val done = new CountDownLatch(1)
-          withUow { uow =>
-            for( action <- tx.commitActions.reverse ) {
-              action.rollback(uow)
-            }
-            uow.syncFlag = true
-            uow.addCompleteListener { done.countDown() }
+        val done = new CountDownLatch(1)
+        withUow { uow =>
+          for( action <- tx.commitActions.reverse ) {
+            action.rollback(uow)
           }
-          done.await()
+          uow.syncFlag = true
+          uow.addCompleteListener { done.countDown() }
+        }
+        done.await()
+        if( tx.prepared ) {
           db.removeTransactionContainer(tx.xacontainer_id)
         }
     }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java?rev=1518289&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java Wed Aug 28 17:20:25 2013
@@ -0,0 +1,33 @@
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.leveldb.LevelDBStore;
+
+import java.io.IOException;
+
+/**
+ */
+public class LevelDBRedeliveryRestartTest extends RedeliveryRestartTest {
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        broker.setDestinationPolicy(policyMap);
+        LevelDBStore store = new LevelDBStore();
+        broker.setPersistenceAdapter(store);
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    @Override
+    protected void stopBrokerWithStoreFailure() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    public static Test suite() {
+        return suite(LevelDBRedeliveryRestartTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java?rev=1518289&r1=1518288&r2=1518289&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java Wed Aug 28 17:20:25 2013
@@ -33,6 +33,8 @@ import org.apache.activemq.transport.fai
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class RedeliveryRestartTest extends BrokerRestartTestSupport {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
@@ -82,7 +84,7 @@ public class RedeliveryRestartTest exten
 
         // make failover aware of the restarted auto assigned port
         connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
-            .getPublishableConnectString());
+                .getPublishableConnectString());
 
         consumer = session.createConsumer(destination);
         for (int i = 0; i < 5; i++) {
@@ -125,11 +127,7 @@ public class RedeliveryRestartTest exten
         assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
         assertEquals("not a redelivery", false, msg.getJMSRedelivered());
 
-        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-
-        // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
-        kahaDBPersistenceAdapter.getStore().getJournal().close();
-        broker.waitUntilStopped();
+        stopBrokerWithStoreFailure();
 
         broker = createRestartedBroker();
         broker.start();
@@ -150,6 +148,14 @@ public class RedeliveryRestartTest exten
         connection.close();
     }
 
+    protected void stopBrokerWithStoreFailure() throws Exception {
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+        // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
+        kahaDBPersistenceAdapter.getStore().getJournal().close();
+        broker.waitUntilStopped();
+    }
+
     private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = session.createQueue(destinationName);