You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/23 18:18:20 UTC
svn commit: r757457 - in
/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker:
./ openwire/ protocol/ stomp/ store/
Author: chirino
Date: Mon Mar 23 17:18:19 2009
New Revision: 757457
URL: http://svn.apache.org/viewvc?rev=757457&view=rev
Log:
More store interface updates.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -16,9 +16,8 @@
*/
package org.apache.activemq.broker;
-import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.store.Store;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
public interface MessageDelivery {
@@ -35,15 +34,7 @@
public <T> T asType(Class<T> type);
public boolean isPersistent();
-
- /**
- * Assigns a tracking number to this MessageDelivery. Tracking numbers
- * are assigned sequentially and are unique within the broker.
- */
- public void setTrackingNumber(long tracking);
-
- public long getTrackingNumber();
-
+
/**
* Returns true if this message requires acknowledgement.
*/
@@ -54,14 +45,6 @@
* been met. This method must not block.
*/
public void onMessagePersisted();
-
- /**
- * Returns the message's buffer representation.
- * @return
- */
- public Buffer getMessageBuffer();
-
- public AsciiBuffer getEncoding();
-
- public long getStreamId();
+
+ public Store.Session.MessageRecord createMessageRecord();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -18,9 +18,9 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.command.Message;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
public class OpenWireMessageDelivery implements MessageDelivery {
@@ -29,7 +29,6 @@
private final Message message;
private Destination destination;
private AsciiBuffer producerId;
- private long tracking;
private PersistListener persistListener = null;
public interface PersistListener {
@@ -89,23 +88,6 @@
return message.isPersistent();
}
- public void setTrackingNumber(long tracking) {
- this.tracking = tracking;
- }
-
- public long getTrackingNumber() {
- return tracking;
- }
-
- /**
- * Returns the message's buffer representation.
- *
- * @return
- */
- public Buffer getMessageBuffer() {
- throw new UnsupportedOperationException("Not yet implemented");
- }
-
public final void onMessagePersisted() {
if (persistListener != null) {
persistListener.onMessagePersisted(this);
@@ -117,12 +99,15 @@
return message.isResponseRequired();
}
- public AsciiBuffer getEncoding() {
- return ENCODING;
- }
- public long getStreamId() {
- return 0;
+ public MessageRecord createMessageRecord() {
+ MessageRecord record = new MessageRecord();
+ record.setEncoding(ENCODING);
+ // TODO: Serialize it..
+ // record.setBuffer()
+ // record.setStreamKey(stream);
+ record.setMessageId(getMsgId());
+ return record;
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -17,7 +17,6 @@
package org.apache.activemq.broker.openwire;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import javax.jms.InvalidSelectorException;
@@ -31,6 +30,7 @@
import org.apache.activemq.broker.Router;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.protocol.ProtocolHandler;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -542,4 +542,8 @@
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = (OpenWireFormat) wireFormat;
}
+
+ public MessageDelivery createMessageDelivery(MessageRecord record) {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -18,11 +18,16 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store;
import org.apache.activemq.wireformat.WireFormat;
public interface ProtocolHandler extends Service {
+
public void setConnection(BrokerConnection connection);
public void onCommand(Object command);
public void onException(Exception error);
public void setWireFormat(WireFormat wf);
+
+ public MessageDelivery createMessageDelivery(Store.Session.MessageRecord record);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Mar 23 17:18:19 2009
@@ -18,8 +18,8 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
@@ -33,7 +33,6 @@
private String receiptId;
private int priority = Integer.MIN_VALUE;
private AsciiBuffer msgId;
- private long tracking = -1;
private PersistListener persistListener = null;
public interface PersistListener {
@@ -112,24 +111,6 @@
return "true".equals(p);
}
- public long getTrackingNumber() {
- return tracking;
- }
-
- public void setTrackingNumber(long tracking) {
- this.tracking = tracking;
- }
-
- /**
- * Returns the message's buffer representation.
- *
- * @return
- */
- public Buffer getMessageBuffer() {
- // Todo use asType() instead?
- throw new UnsupportedOperationException("not yet implemented");
- }
-
public boolean isResponseRequired() {
return receiptId != null;
}
@@ -141,11 +122,13 @@
}
}
- public AsciiBuffer getEncoding() {
- return ENCODING;
- }
-
- public long getStreamId() {
- return 0;
+ public MessageRecord createMessageRecord() {
+ MessageRecord record = new MessageRecord();
+ record.setEncoding(ENCODING);
+ // TODO: Serialize it..
+ // record.setBuffer()
+ // record.setStreamKey(stream);
+ record.setMessageId(getMsgId());
+ return record;
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 23 17:18:19 2009
@@ -21,7 +21,6 @@
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -36,6 +35,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
import org.apache.activemq.broker.protocol.ProtocolHandler;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.flow.Flow;
@@ -483,4 +483,8 @@
// TODO Auto-generated method stub
return null;
}
+
+ public MessageDelivery createMessageDelivery(MessageRecord record) {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=757457&r1=757456&r2=757457&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon Mar 23 17:18:19 2009
@@ -26,6 +26,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.store.Store.Callback;
import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.ISourceController;
@@ -341,12 +342,7 @@
protected void doExcecute(Session session) {
// TODO need to get at protocol buffer.
- Session.MessageRecord record = new Session.MessageRecord();
- record.setMessageId(delivery.getMsgId());
- record.setEncoding(delivery.getEncoding());
- record.setBuffer(delivery.getMessageBuffer());
- record.setStreamKey(delivery.getStreamId());
-
+ MessageRecord record = delivery.createMessageRecord();
Long key = session.messageAdd(record);
for(DeliveryTarget target : targets)
{