You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/31 21:56:03 UTC

svn commit: r1488375 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/ activemq-j...

Author: chirino
Date: Fri May 31 19:56:03 2013
New Revision: 1488375

URL: http://svn.apache.org/r1488375
Log:
Additional fixes related to AMQ-4563: You can now configure the storeOpenWireVersion property of a broker to control which version of openwire is used by the persistence stores.  This needs to be set to version 10 to preserve the original AMQP message ids.

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Fri May 31 19:56:03 2013
@@ -92,6 +92,7 @@ import org.apache.activemq.network.Conne
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.jms.JmsConnector;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.proxy.ProxyConnector;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.selector.SelectorParser;
@@ -239,6 +240,8 @@ public class BrokerService implements Se
     private boolean restartAllowed = true;
     private boolean restartRequested = false;
 
+    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
+
     static {
 
         try {
@@ -2880,4 +2883,12 @@ public class BrokerService implements Se
     public void requestRestart() {
         this.restartRequested = true;
     }
+
+    public int getStoreOpenWireVersion() {
+        return storeOpenWireVersion;
+    }
+
+    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
+        this.storeOpenWireVersion = storeOpenWireVersion;
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri May 31 19:56:03 2013
@@ -83,6 +83,9 @@ public class FilePendingMessageCursor ex
     @Override
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
+            if( this.broker != null) {
+                wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
+            }
             super.start();
             if (systemUsage != null) {
                 systemUsage.getMemoryUsage().addUsageListener(this);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri May 31 19:56:03 2013
@@ -65,6 +65,8 @@ public class SchedulerBroker extends Bro
         this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
         this.context.setBroker(next);
         this.systemUsage = brokerService.getSystemUsage();
+
+        wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
     }
 
     public synchronized JobScheduler getJobScheduler() throws Exception {

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Fri May 31 19:56:03 2013
@@ -304,6 +304,11 @@ public class JDBCPersistenceAdapter exte
     }
 
     public void doStart() throws Exception {
+
+        if( brokerService!=null ) {
+          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
+        }
+
         // Cleanup the db periodically.
         if (cleanupPeriod > 0) {
             cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Fri May 31 19:56:03 2013
@@ -241,6 +241,10 @@ public class JournalPersistenceAdapter i
             return;
         }
 
+        if( brokerService!=null ) {
+          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
+        }
+
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
             public boolean iterate() {
                 return doCheckpoint();

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri May 31 19:56:03 2013
@@ -177,6 +177,9 @@ public class KahaDBStore extends Message
 
     @Override
     public void doStart() throws Exception {
+        if( brokerService!=null ) {
+            wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
+        }
         super.doStart();
         this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
         this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri May 31 19:56:03 2013
@@ -62,13 +62,16 @@ import org.slf4j.LoggerFactory;
 public class KahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
     ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
-    private final WireFormat wireFormat = new OpenWireFormat();
     private final KahaDBStore theStore;
 
     public KahaDBTransactionStore(KahaDBStore theStore) {
         this.theStore = theStore;
     }
 
+    private WireFormat wireFormat(){
+      return this.theStore.wireFormat;
+    }
+
     public class Tx {
         private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
@@ -335,13 +338,13 @@ public class KahaDBTransactionStore impl
             for (Operation op : entry.getValue()) {
                 if (op.getClass() == AddOpperation.class) {
                     AddOpperation addOp = (AddOpperation) op;
-                    Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
+                    Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
                             .newInput()));
                     messageList.add(msg);
                 } else {
                     RemoveOpperation rmOp = (RemoveOpperation) op;
                     Buffer ackb = rmOp.getCommand().getAck();
-                    MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
+                    MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
                     ackList.add(ack);
                 }
             }

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Fri May 31 19:56:03 2013
@@ -24,6 +24,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -60,9 +63,10 @@ import org.apache.activemq.wireformat.Wi
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 
-public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
+public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
 
     private final WireFormat wireFormat = new OpenWireFormat();
+    private BrokerService brokerService;
 
     public void setBrokerName(String brokerName) {
     }
@@ -575,5 +579,17 @@ public class TempKahaDBStore extends Tem
     public long getLastProducerSequenceId(ProducerId id) {
         return -1;
     }
-        
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    public void load() throws IOException {
+        if( brokerService!=null ) {
+            wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
+        }
+        super.load();
+    }
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1488375&r1=1488374&r2=1488375&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Fri May 31 19:56:03 2013
@@ -206,7 +206,9 @@ class LevelDBStore extends LockableServi
   var snappyCompressLogs = false
 
   def doStart: Unit = {
-
+    if( brokerService!=null ) {
+      wireFormat.setVersion(brokerService.getStoreOpenWireVersion)
+    }
     snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
     debug("starting")