You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/23 18:18:20 UTC

svn commit: r757457 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker: ./ openwire/ protocol/ stomp/ store/

Author: chirino
Date: Mon Mar 23 17:18:19 2009
New Revision: 757457

URL: http://svn.apache.org/viewvc?rev=757457&view=rev
Log:
More store interface updates.


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -16,9 +16,8 @@
  */
 package org.apache.activemq.broker;
 
-import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
 
 public interface MessageDelivery {
 
@@ -35,15 +34,7 @@
     public <T> T asType(Class<T> type);
 
     public boolean isPersistent();
-    
-    /**
-     * Assigns a tracking number to this MessageDelivery. Tracking numbers
-     * are assigned sequentially and are unique within the broker. 
-     */
-    public void setTrackingNumber(long tracking);
-    
-    public long getTrackingNumber();
-    
+
     /**
      * Returns true if this message requires acknowledgement.
      */
@@ -54,14 +45,6 @@
      * been met. This method must not block. 
      */
     public void onMessagePersisted();
-
-    /**
-     * Returns the message's buffer representation.
-     * @return
-     */
-    public Buffer getMessageBuffer();
-
-    public AsciiBuffer getEncoding();
-
-    public long getStreamId();
+    
+    public Store.Session.MessageRecord createMessageRecord();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -18,9 +18,9 @@
 
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
 
 public class OpenWireMessageDelivery implements MessageDelivery {
 
@@ -29,7 +29,6 @@
     private final Message message;
     private Destination destination;
     private AsciiBuffer producerId;
-    private long tracking;
     private PersistListener persistListener = null;
 
     public interface PersistListener {
@@ -89,23 +88,6 @@
         return message.isPersistent();
     }
 
-    public void setTrackingNumber(long tracking) {
-        this.tracking = tracking;
-    }
-
-    public long getTrackingNumber() {
-        return tracking;
-    }
-
-    /**
-     * Returns the message's buffer representation.
-     * 
-     * @return
-     */
-    public Buffer getMessageBuffer() {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
-
     public final void onMessagePersisted() {
         if (persistListener != null) {
             persistListener.onMessagePersisted(this);
@@ -117,12 +99,15 @@
         return message.isResponseRequired();
     }
 
-    public AsciiBuffer getEncoding() {
-        return ENCODING;
-    }
 
-    public long getStreamId() {
-        return 0;
+    public MessageRecord createMessageRecord() {
+        MessageRecord record = new MessageRecord();
+        record.setEncoding(ENCODING);
+        // TODO: Serialize it..
+        // record.setBuffer()
+        // record.setStreamKey(stream);
+        record.setMessageId(getMsgId());
+        return record;
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.openwire;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 
 import javax.jms.InvalidSelectorException;
@@ -31,6 +30,7 @@
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -542,4 +542,8 @@
     public void setWireFormat(WireFormat wireFormat) {
         this.wireFormat = (OpenWireFormat) wireFormat;
     }
+
+    public MessageDelivery createMessageDelivery(MessageRecord record) {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -18,11 +18,16 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.wireformat.WireFormat;
 
 public interface ProtocolHandler extends Service {
+    
     public void setConnection(BrokerConnection connection);
     public void onCommand(Object command);
     public void onException(Exception error);
     public void setWireFormat(WireFormat wf);
+    
+    public MessageDelivery createMessageDelivery(Store.Session.MessageRecord record);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -18,8 +18,8 @@
 
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 
@@ -33,7 +33,6 @@
     private String receiptId;
     private int priority = Integer.MIN_VALUE;
     private AsciiBuffer msgId;
-    private long tracking = -1;
     private PersistListener persistListener = null;
 
     public interface PersistListener {
@@ -112,24 +111,6 @@
         return "true".equals(p);
     }
 
-    public long getTrackingNumber() {
-        return tracking;
-    }
-
-    public void setTrackingNumber(long tracking) {
-        this.tracking = tracking;
-    }
-
-    /**
-     * Returns the message's buffer representation.
-     * 
-     * @return
-     */
-    public Buffer getMessageBuffer() {
-        // Todo use asType() instead?
-        throw new UnsupportedOperationException("not yet implemented");
-    }
-
     public boolean isResponseRequired() {
         return receiptId != null;
     }
@@ -141,11 +122,13 @@
         }
     }
 
-    public AsciiBuffer getEncoding() {
-        return ENCODING;
-    }
-
-    public long getStreamId() {
-        return 0;
+    public MessageRecord createMessageRecord() {
+        MessageRecord record = new MessageRecord();
+        record.setEncoding(ENCODING);
+        // TODO: Serialize it..
+        // record.setBuffer()
+        // record.setStreamKey(stream);
+        record.setMessageId(getMsgId());
+        return record;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -21,7 +21,6 @@
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -36,6 +35,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.flow.Flow;
@@ -483,4 +483,8 @@
         // TODO Auto-generated method stub
         return null;
     }
+    
+    public MessageDelivery createMessageDelivery(MessageRecord record) {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon Mar 23 17:18:19 2009
@@ -26,6 +26,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
@@ -341,12 +342,7 @@
         protected void doExcecute(Session session) {
             // TODO need to get at protocol buffer.
             
-            Session.MessageRecord record = new Session.MessageRecord();
-            record.setMessageId(delivery.getMsgId());
-            record.setEncoding(delivery.getEncoding());
-            record.setBuffer(delivery.getMessageBuffer());
-            record.setStreamKey(delivery.getStreamId());
-            
+            MessageRecord record = delivery.createMessageRecord();
             Long key = session.messageAdd(record);
             for(DeliveryTarget target : targets)
             {