You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [4/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ s...
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Jul 19 18:44:21 2013
@@ -33,10 +33,7 @@ import org.apache.activeblaze.Processor;
import org.apache.activeblaze.Subscription;
import org.apache.activeblaze.SubscriptionHolder;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.CompressionProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.FragmentationProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
@@ -44,52 +41,42 @@ import org.apache.activeblaze.util.Async
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.PropertyUtil;
import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
-import org.apache.activeblaze.wire.MemberData.MemberDataBean;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.MessageBuffer;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.MemberImpl;
+import org.apache.activeblaze.wire.Packet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- * <P>
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
- *
+ * <p/>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
+ * communication
*/
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel{
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
private String name;
protected Processor unicast;
private MemberImpl local;
private BlazeMessageListener inboxListener;
- protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
- 10000);
+ protected Map<String, SendRequest> messageRequests = new LRUCache<String, SendRequest>(10000);
private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
private Group group;
protected Buffer inboxAddress;
protected int inBoxPort;
protected final Object localMutex = new Object();
-
+
/**
* Constructor
- *
- * @param name
*/
protected BlazeGroupChannelImpl(String name) {
super();
this.name = name;
}
-
+
/**
- * @throws Exception
* @see org.apache.activeblaze.Service#init()
*/
- public void doInit() throws Exception{
+ public void doInit() throws Exception {
super.doInit();
String unicastURIStr = getConfiguration().getUnicastURI();
unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
@@ -107,40 +94,34 @@ public class BlazeGroupChannelImpl exten
this.local = createLocal(unicastURI);
this.group = createGroup();
}
-
- protected final Processor configureProcess(ChainedProcessor transport,String reliability) throws Exception{
+
+ protected final ChainedProcessor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
int maxPacketSize = getConfiguration().getMaxPacketSize();
- CompressionProcessor result = new CompressionProcessor();
+ ChainedProcessor result = getReliability(reliability);
result.setPrev(this);
result.setExceptionListener(this);
result.setMaxPacketSize(maxPacketSize);
- FragmentationProcessor fp = new FragmentationProcessor();
- fp.setMaxPacketSize(maxPacketSize);
- result.setEnd(fp);
- ChainedProcessor reliable = getReliability(reliability);
- result.setEnd(reliable);
result.setEnd(transport);
return result;
}
-
- protected ChainedProcessor getReliability(String reliability) throws Exception{
+
+ protected ChainedProcessor getReliability(String reliability) throws Exception {
DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
return reliable;
}
-
- protected MemberImpl createLocal(URI uri) throws Exception{
+
+ protected MemberImpl createLocal(URI uri) throws Exception {
return new MemberImpl(getId(), getName(), 0, 0, uri);
}
-
- protected Group createGroup(){
+
+ protected Group createGroup() {
return new Group(this);
}
-
+
/**
- * @throws Exception
* @see org.apache.activeblaze.Service#shutDown()
*/
- public void doShutDown() throws Exception{
+ public void doShutDown() throws Exception {
super.doShutDown();
if (this.group != null) {
this.group.shutDown();
@@ -149,43 +130,40 @@ public class BlazeGroupChannelImpl exten
this.unicast.shutDown();
}
}
-
+
/**
- * @throws Exception
* @see org.apache.activeblaze.Service#start()
*/
- public void doStart() throws Exception{
+ public void doStart() throws Exception {
super.doStart();
this.unicast.start();
this.group.start();
}
-
+
/**
- * @throws Exception
* @see org.apache.activeblaze.Service#stop()
*/
- public void doStop() throws Exception{
+ public void doStop() throws Exception {
super.doStop();
this.group.stop();
this.unicast.stop();
}
-
+
/**
* @return the name
*/
- public String getName(){
+ public String getName() {
synchronized (this.localMutex) {
return this.name;
}
}
-
+
/**
* set the name
- *
- * @param name
+ *
* @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
*/
- public void setName(String name){
+ public void setName(String name) {
synchronized (this.localMutex) {
this.name = name;
if (this.local != null) {
@@ -193,131 +171,115 @@ public class BlazeGroupChannelImpl exten
}
}
}
-
+
/**
* @return the inboxListener
*/
- public BlazeMessageListener getInboxListener(){
+ public BlazeMessageListener getInboxListener() {
return this.inboxListener;
}
-
+
/**
* @param inboxListener the inboxListener to set
*/
- public void setInboxListener(BlazeMessageListener inboxListener){
+ public void setInboxListener(BlazeMessageListener inboxListener) {
this.inboxListener = inboxListener;
}
-
+
/**
* @return this channel's configuration
* @see org.apache.activeblaze.group.BlazeGroupChannel#getConfiguration()
*/
- public BlazeGroupConfiguration getConfiguration(){
+ public BlazeGroupConfiguration getConfiguration() {
return (BlazeGroupConfiguration) this.configuration;
}
-
+
/**
* @return the member for this channel
* @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
*/
- public MemberImpl getLocalMember(){
+ public MemberImpl getLocalMember() {
synchronized (this.localMutex) {
return this.local;
}
}
-
- protected void setLocalMember(MemberImpl local){
+
+ protected void setLocalMember(MemberImpl local) {
synchronized (this.localMutex) {
this.local = local;
}
}
-
+
/**
- * @param l
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
*/
- public void addMemberChangedListener(MemberChangedListener l) throws Exception{
+ public void addMemberChangedListener(MemberChangedListener l) throws Exception {
init();
this.group.addMemberChangedListener(l);
}
-
+
/**
- * @param l
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
*/
- public void removeMemberChangedListener(MemberChangedListener l) throws Exception{
+ public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
init();
this.group.removeMemberChangedListener(l);
}
-
+
/**
- * @param id
* @return the Member
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
*/
- public Member getMemberById(String id) throws Exception{
+ public Member getMemberById(String id) throws Exception {
init();
return this.group.getMemberById(id);
}
-
+
/**
- * @param name
* @return the Member
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
*/
- public Member getMemberByName(String name) throws Exception{
+ public Member getMemberByName(String memberName) throws Exception {
init();
- return this.group.getMemberByName(name);
+ return this.group.getMemberByName(memberName);
}
-
+
/**
* Will wait for a member to advertise itself if not available
- *
- * @param name
- * @param timeout
+ *
* @return the member or null
- * @throws Exception
*/
- public Member getAndWaitForMemberByName(String name,int timeout) throws Exception{
+ public Member getAndWaitForMemberByName(String memberName, int timeout) throws Exception {
init();
- return this.group.getAndWaitForMemberByName(name, timeout);
+ return this.group.getAndWaitForMemberByName(memberName, timeout);
}
-
+
/**
* @return the members
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
*/
- public Set<Member> getMembers() throws Exception{
+ public Set<Member> getMembers() throws Exception {
init();
return this.group.getMembers();
}
-
+
/**
* Send a message to a member of the group - in a round-robin fashion
- *
- * @param destination
- * @param message
- * @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+ *
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+ * org.apache.activeblaze.BlazeMessage)
*/
- public void send(String destination,BlazeMessage message) throws Exception{
- send(new Destination(destination, false), message);
+ public void send(String destination, BlazeMessage message) throws Exception {
+ send(new Destination(new Buffer(destination), false), message);
}
-
+
/**
* Send a message to a member of the group - in a round-robin fashion
- *
- * @param destination
- * @param message
- * @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+ *
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+ * org.apache.activeblaze.BlazeMessage)
*/
- public void send(Destination destination,BlazeMessage message) throws Exception{
+ public void send(Destination destination, BlazeMessage message) throws Exception {
while (true) {
MemberImpl member = getQueueDestination(destination.getName());
if (member != null) {
@@ -332,90 +294,66 @@ public class BlazeGroupChannelImpl exten
}
}
}
-
+
/**
- * @param member
- * @param message
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage)
*/
- public void send(Member member,BlazeMessage message) throws Exception{
- send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
+ public void send(Member member, BlazeMessage message) throws Exception {
+ send((MemberImpl) member, member.getInBoxDestination(), message);
}
-
+
/**
- * @param member
- * @param message
* @return the response
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(Member member,BlazeMessage message) throws Exception{
+ public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
return sendRequest(member, message, 0);
}
-
+
/**
- * @param member
- * @param message
- * @param timeout
* @return the response
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(Member member,BlazeMessage message,int timeout) throws Exception{
- return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
+ public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+ return sendRequest((MemberImpl) member, member.getInBoxDestination(), message, timeout);
}
-
+
/**
- * @param destination
- * @param message
* @return the response
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(String destination,BlazeMessage message) throws Exception{
+ public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
return sendRequest(new Destination(destination, false), message, 0);
}
-
+
/**
- * @param destination
- * @param message
* @return the response
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(Destination destination,BlazeMessage message) throws Exception{
+ public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception {
return sendRequest(destination, message, 0);
}
-
+
/**
- * @param destination
- * @param message
- * @param timeout
* @return the request
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(String destination,BlazeMessage message,int timeout) throws Exception{
+ public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
return sendRequest(new Destination(destination, false), message, timeout);
}
-
+
/**
- * @param destination
- * @param message
- * @param timeout
* @return the response
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(Destination destination,BlazeMessage message,int timeout) throws Exception{
+ public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception {
Buffer key = destination.getName();
long deadline = 0;
long waitTime = timeout;
@@ -426,7 +364,7 @@ public class BlazeGroupChannelImpl exten
MemberImpl member = getQueueDestination(key);
if (member != null) {
try {
- BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
+ BlazeMessage result = sendRequest(member, destination, message, (int) waitTime);
if (result != null) {
return result;
}
@@ -442,334 +380,275 @@ public class BlazeGroupChannelImpl exten
}
return null;
}
-
+
/**
* send Request
- *
- * @param member
- * @param destinationName
- * @param message
- * @param timeout
+ *
* @return the response
- * @throws Exception
*/
- public BlazeMessage sendRequest(MemberImpl member,Buffer destinationName,BlazeMessage message,int timeout)
- throws Exception{
+ public BlazeMessage sendRequest(MemberImpl member, String destinationName, BlazeMessage message, int timeout)
+ throws Exception {
+ return sendRequest(member, new Destination(destinationName, false), message, timeout);
+ }
+
+ /**
+ * send Request
+ *
+ * @return the response
+ */
+ public BlazeMessage sendRequest(MemberImpl member, Destination dest, BlazeMessage message, int timeout)
+ throws Exception {
BlazeMessage result = null;
if (member != null) {
- SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
- Destination dest = new Destination(destinationName, false);
- Packet packet = buildPacket(dest, message);
+ SendRequest request = new SendRequest();
+ preProcessMessage(((MemberImpl) member).getAddress(), dest, message);
synchronized (this.messageRequests) {
- this.messageRequests.put(packet.getPacketData().getMessageId(), request);
+ this.messageRequests.put(message.getId(), request);
}
-
- packet.setTo((member).getAddress());
- this.unicast.downStream(packet);
- PacketDataBuffer response = request.get(timeout);
- result = buildBlazeMessage(response);
+ this.unicast.downStream(message);
+ result = (BlazeMessage) request.get(timeout);
}
return result;
}
-
+
/**
- * @param to
- * @param response
- * @param correlationId
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage, java.lang.String)
*/
- public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
+ public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
Destination dest = new Destination(to.getInBoxDestination(), false);
- Packet packet = buildPacket(dest,response,correlationId);
- packet.setTo(((MemberImpl) to).getAddress());
- this.unicast.downStream(packet);
+ preProcessMessage(((MemberImpl) to).getAddress(), dest, response);
+ response.setCorrelationId(correlationId);
+ this.unicast.downStream(response);
}
-
- protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
+
+ protected void send(MemberImpl member, String destinationName, BlazeMessage message) throws Exception {
+ send(member, new Buffer(destinationName), message);
+ }
+
+ protected void send(MemberImpl member, Buffer destinationName, BlazeMessage message) throws Exception {
Destination dest = new Destination(destinationName, false);
- Packet packet = buildPacket(dest, message,true);
- packet.setTo(member.getAddress());
- this.unicast.downStream(packet);
+ preProcessMessage(member.getAddress(), dest, message);
+ this.unicast.downStream(message);
+ }
+
+ protected void preProcessMessage(InetSocketAddress to, Destination destination, BlazeMessage message) {
+ preProcessMessage(destination, message);
+ message.setTo(to);
}
-
+
/**
- * @param destination
- * @param l
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
- * org.apache.activeblaze.group.BlazeMessageListener)
+ * BlazeMessageListener)
*/
- public void addBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
+ public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
init();
SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
-
this.queueMessageListeners.add(key);
-
buildLocal();
}
-
+
/**
- * @param subscription
- * @param l
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
- * org.apache.activeblaze.group.BlazeMessageListener)
+ * BlazeMessageListener)
*/
- public void addBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
+ public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
init();
SubscriptionHolder key = new SubscriptionHolder(subscription, l);
this.queueMessageListeners.add(key);
-
buildLocal();
}
-
+
/**
- * @param destination
- * @param l
- * @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String, BlazeMessageListener)
*/
- public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
+ public void removeBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
init();
SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
-
this.queueMessageListeners.remove(key);
-
buildLocal();
-
}
-
+
/**
- * @param subscription
- * @param l
- * @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String, BlazeMessageListener)
*/
- public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
+ public void removeBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
init();
-
SubscriptionHolder key = new SubscriptionHolder(subscription, l);
this.queueMessageListeners.remove(key);
-
buildLocal();
-
}
-
+
/**
- * @param destination
- * @param l
- * @throws Exception
* @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
* org.apache.activeblaze.BlazeMessageListener)
*/
- public void addBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
+ public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
init();
super.addBlazeTopicMessageListener(destination, l);
buildLocal();
}
-
+
/**
- * @param destination
- * @return the removed <Code>BlazeTopicListener</Code>
- * @throws Exception
- * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
+ * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String, BlazeMessageListener)
*/
- public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
+ public void removeBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
init();
super.removeBlazeTopicMessageListener(destination, l);
buildLocal();
-
}
-
+
/**
- * @param groupName
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addToGroup(java.lang.String)
*/
- public void addToGroup(String groupName) throws Exception{
+ public void addToGroup(String groupName) throws Exception {
init();
this.local.addToGroup(groupName);
}
-
+
/**
- * @param groupName
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeFromGroup(java.lang.String)
*/
- public void removeFromGroup(String groupName) throws Exception{
+ public void removeFromGroup(String groupName) throws Exception {
init();
this.local.removeFromGroup(groupName);
}
-
+
/**
* @return the groups
- * @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getGroups()
*/
- public List<String> getGroups() throws Exception{
+ public List<String> getGroups() throws Exception {
init();
return this.local.getGroups();
}
-
- protected void processData(String id,Buffer correlationId,PacketDataBuffer data) throws Exception{
+
+ protected void processPacket(Packet packet) throws Exception {
if (isStarted()) {
- if (!processRequest(correlationId, data)) {
- MessageType type = data.getMessageType();
- if (type == MessageType.BLAZE_DATA) {
- doProcessBlazeData(data);
- } else if (type == MessageType.MEMBER_DATA) {
- doProcessMemberData(data);
+ if (!processRequest(packet)) {
+ if (packet instanceof BlazeMessage) {
+ doProcessBlazeMessage((BlazeMessage) packet);
+ } else if (packet instanceof Member) {
+ doProcessMember((MemberImpl) packet);
}
}
}
}
-
- protected boolean processRequest(Buffer correlationId,PacketDataBuffer value){
+
+ protected boolean processRequest(Packet packet) {
boolean result = false;
- if (correlationId != null) {
- SendRequest<PacketDataBuffer> request = null;
+ String correlationId = packet.getCorrelationId();
+ if (correlationId != null && correlationId.length() > 0) {
+ SendRequest request = null;
synchronized (this.messageRequests) {
request = this.messageRequests.remove(correlationId);
}
if (request != null) {
- request.put(correlationId, value);
+ request.put(correlationId, packet);
result = true;
}
}
return result;
}
-
- protected void doProcessBlazeData(PacketData data) throws Exception{
-
- if (data.hasDestinationData()&&data.getDestinationData().getTopic()) {
- dispatch(data);
- } else {
-
- Buffer destinationName = data.getDestinationData().getName();
- BlazeMessage message = buildBlazeMessage(data);
- if (this.inboxListener != null && this.producerId.equals(destinationName)) {
- this.inboxListener.onMessage(message);
+
+ protected void doProcessBlazeMessage(BlazeMessage message) throws Exception {
+ Destination destination = message.getDestination();
+ if (destination != null) {
+ if (destination.isTopic()) {
+ dispatch(message);
} else {
-
- int index = 0;
- for (SubscriptionHolder entry : this.queueMessageListeners) {
- if (entry.getSubscription().matches(destinationName)) {
- entry.getListener().onMessage(message);
- this.queueMessageListeners.remove(index);
- this.queueMessageListeners.add(entry);
+ if (this.inboxListener != null && this.producerId.equals(destination.getName())) {
+ this.inboxListener.onMessage(message);
+ } else {
+ int index = 0;
+ for (SubscriptionHolder entry : this.queueMessageListeners) {
+ if (entry.getSubscription().matches(destination.getName())) {
+ entry.getListener().onMessage(message);
+ this.queueMessageListeners.remove(index);
+ this.queueMessageListeners.add(entry);
+ }
+ index++;
}
- index++;
}
-
}
}
}
-
- protected Group getGroup(){
+
+ protected Group getGroup() {
return this.group;
}
-
- protected BlazeMessage createMessage(String fromId){
+
+ protected BlazeMessage createMessage(String fromId) {
Member member = this.group.getMemberById(fromId);
BlazeMessage message = new BlazeGroupMessage(member);
return message;
}
-
- protected final void doProcessMemberData(PacketData data) throws Exception{
- Buffer payload = data.getPayload();
- MemberDataBuffer memberData = MemberDataBuffer.parseUnframed(payload);
- this.group.processMember(memberData);
+
+ protected final void doProcessMember(MemberImpl member) throws Exception {
+ this.group.processMember(member);
}
-
+
/**
- * @param messageType
- * @param message
+ *
+ * @param packet
* @throws Exception
*/
- public void broadcastMessage(MessageType messageType,MessageBuffer message) throws Exception{
- PacketDataBean data = getPacketData(messageType, message);
- data.setReliable(false);
- Packet packet = new Packet(data.freeze());
+ public void broadcastManagementMessage(Packet packet) throws Exception {
this.broadcast.downStreamManagement(packet);
}
-
+
/**
* send a message
- *
- * @param asyncRequest
- * @param member
- * @param messageType
- * @param message
- * @throws Exception
*/
- public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member,MessageType messageType,
- MessageBuffer message) throws Exception{
- SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
- PacketDataBean data = getPacketData(messageType, message);
- asyncRequest.add(data.getMessageId(), request);
+ public void sendGroupRequest(AsyncGroupRequest asyncRequest, MemberImpl member, Packet packet) throws Exception {
+ SendRequest request = new SendRequest();
+ asyncRequest.add(packet.getId(), request);
synchronized (this.messageRequests) {
- this.messageRequests.put(data.getMessageId(), request);
+ this.messageRequests.put(packet.getId(), request);
}
- data.setReliable(false);
- Packet packet = new Packet(data.freeze());
+ packet.setReliable(true);
packet.setTo(member.getAddress());
this.unicast.downStream(packet);
}
-
+
/**
* broadcast a general message
- *
- * @param messageType
- * @param message
- * @param correlationId
- * @throws Exception
*/
- public void broadcastMessage(MessageType messageType,MessageBuffer message,String correlationId) throws Exception{
- PacketDataBean data = getPacketData(messageType, message);
- data.setCorrelationId(new Buffer(correlationId));
- data.setReliable(true);
- Packet packet = new Packet(data.freeze());
+ public void broadcastManagementMessage(Packet packet, String correlationId) throws Exception {
+ packet.setCorrelationId(correlationId);
+ packet.setReliable(true);
this.broadcast.downStreamManagement(packet);
}
-
+
/**
* @param to
- * @param messageType
- * @param message
+ * @param packet
* @throws Exception
*/
- public void sendMessage(InetSocketAddress to,MessageType messageType,MessageBuffer message) throws Exception{
- PacketDataBean data = getPacketData(messageType, message);
- data.setReliable(false);
- Packet packet = new Packet(data.freeze());
+ public void sendManagementMessage(InetSocketAddress to, Packet packet) throws Exception {
+ packet.setReliable(false);
packet.setTo(to);
this.unicast.downStream(packet);
}
-
+
/**
* @param to
- * @param messageType
- * @param message
+ * @param packet
* @param correlationId
* @throws Exception
*/
- public void sendReply(MemberImpl to,MessageType messageType,MessageBuffer message,String correlationId)
- throws Exception{
- PacketDataBean data = getPacketData(messageType, message);
- data.setCorrelationId(new Buffer(correlationId));
- data.setReliable(false);
- Packet packet = new Packet(data.freeze());
+ public void sendReply(MemberImpl to, Packet packet, String correlationId) throws Exception {
+ packet.setReliable(false);
packet.setTo(to.getAddress());
+ packet.setCorrelationId(correlationId);
this.unicast.downStream(packet);
}
-
- protected MemberImpl getQueueDestination(Buffer destination){
+
+ protected MemberImpl getQueueDestination(Buffer destination) {
// choose a member
MemberImpl result = null;
Map<Subscription, List<MemberImpl>> map = this.group.getQueueMap();
- Subscription key = new Subscription(destination);
+ Subscription key = new Subscription(destination, false);
List<MemberImpl> list = map.get(key);
if (list == null) {
// search through wildcard matches
@@ -787,37 +666,23 @@ public class BlazeGroupChannelImpl exten
}
return result;
}
-
- protected void buildLocal(){
+
+ protected void buildLocal() {
if (isInitialized()) {
try {
synchronized (this.localMutex) {
-
- MemberDataBean bean = getLocalMember().getData().copy();
- bean.clearSubscriptionData();
+ MemberImpl result = getLocalMember().clone();
+ result.clearSubscriptions();
// add topic destinations
for (SubscriptionHolder s : this.topicMessageListeners) {
- bean.addSubscriptionData(s.getSubscription().getData());
+ result.addSubscription(s.getSubscription());
}
// add Queue Destinations
-
for (SubscriptionHolder s : this.queueMessageListeners) {
- bean.addSubscriptionData(s.getSubscription().getData());
+ result.addSubscription(s.getSubscription());
}
-
- MemberImpl result = new MemberImpl(bean.freeze());
-
this.group.processMemberUpdate(this.local, result);
-
- bean = bean.copy();
- bean.setSubscriptionsChanged(true);
- result = new MemberImpl(bean.freeze());
this.group.broadcastHeartBeat(result);
-
- bean = bean.copy();
- bean.clearSubscriptionsChanged();
- result = new MemberImpl(bean.freeze());
-
this.local = result;
this.group.updateLocal(this.local);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Fri Jul 19 18:44:21 2013
@@ -20,13 +20,12 @@ import org.apache.activeblaze.BlazeConfi
/**
* Configuration for a BlazeGroupChannel
- *
*/
public class BlazeGroupConfiguration extends BlazeConfiguration {
private int heartBeatInterval = 1000;
private String unicastURI = "udp://localhost:0";
//reliability
- private String reliableUnicast ="swp";
+ private String reliableUnicast = "swp";
/**
* @return the heartBeatInterval
@@ -36,8 +35,7 @@ public class BlazeGroupConfiguration ext
}
/**
- * @param heartBeatInterval
- * the heartBeatInterval to set
+ * @param heartBeatInterval the heartBeatInterval to set
*/
public void setHeartBeatInterval(int heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
@@ -55,8 +53,7 @@ public class BlazeGroupConfiguration ext
}
/**
- * @param unicastURI
- * the unicastURI to set
+ * @param unicastURI the unicastURI to set
*/
public void setUnicastURI(String unicastURI) {
this.unicastURI = unicastURI;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java Fri Jul 19 18:44:21 2013
@@ -16,27 +16,26 @@
*/
package org.apache.activeblaze.group;
-import org.apache.activeblaze.BlazeException;
+import java.io.IOException;
+
import org.apache.activeblaze.BlazeMessage;
/**
* Has information about the sender of the Message
* This type of message is created on receiver
- *
*/
public class BlazeGroupMessage extends BlazeMessage {
private final Member sender;
-
-
+
+
/**
* Constructor
- * @param sender
*/
- public BlazeGroupMessage(Member sender){
- this.sender=sender;
+ public BlazeGroupMessage(Member sender) {
+ this.sender = sender;
}
-
- public BlazeMessage copy() throws BlazeException{
+
+ protected BlazeMessage copy() throws IOException {
BlazeMessage copy = new BlazeGroupMessage(this.sender);
copy(copy);
return copy;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java Fri Jul 19 18:44:21 2013
@@ -18,18 +18,15 @@ package org.apache.activeblaze.group;
/**
* A Default listener for membership changes to a group
- *
*/
public class DefaultMemberChangedListener implements MemberChangedListener {
/**
- * @param member
* @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
*/
public void memberStarted(Member member) {
}
/**
- * @param member
* @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
*/
public void memberStopped(Member member) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Fri Jul 19 18:44:21 2013
@@ -16,6 +16,7 @@
*/
package org.apache.activeblaze.group;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,21 +32,18 @@ import java.util.concurrent.ThreadFactor
import org.apache.activeblaze.BaseService;
import org.apache.activeblaze.Subscription;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.SubscriptionData;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
+import org.apache.activeblaze.wire.MemberImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Maintains members of a group
- *
*/
public class Group extends BaseService {
static final Log LOG = LogFactory.getLog(Group.class);
final BlazeGroupChannelImpl channel;
private final BlazeGroupConfiguration configuration;
- private Timer heartBeatTimer;
+ Timer heartBeatTimer;
private Timer checkMemberShipTimer;
protected Map<String, MemberImpl> members = new ConcurrentHashMap<String, MemberImpl>();
private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
@@ -56,11 +54,6 @@ public class Group extends BaseService {
/**
* Constructor
- *
- * @param local
- * @param channel
- * @param transport
- * @param config
*/
protected Group(BlazeGroupChannelImpl channel) {
this.channel = channel;
@@ -130,9 +123,6 @@ public class Group extends BaseService {
/**
* Get a member by its unique id
- *
- * @param id
- * @return
*/
Member getMemberById(String id) {
return this.members.get(id);
@@ -140,9 +130,6 @@ public class Group extends BaseService {
/**
* Return a member of the Group with the matching name
- *
- * @param name
- * @return
*/
Member getMemberByName(String name) {
if (name != null) {
@@ -157,11 +144,8 @@ public class Group extends BaseService {
/**
* Will wait for a member to advertise itself if not available
- *
- * @param name
- * @param timeout
+ *
* @return the member or null
- * @throws InterruptedException
*/
public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
Member result = null;
@@ -187,8 +171,6 @@ public class Group extends BaseService {
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#init()
*/
public void doInit() throws Exception {
@@ -204,8 +186,6 @@ public class Group extends BaseService {
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#shutDown()
*/
public void doShutDown() throws Exception {
@@ -216,8 +196,6 @@ public class Group extends BaseService {
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#start()
*/
public void doStart() throws Exception {
@@ -250,21 +228,21 @@ public class Group extends BaseService {
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#stop()
*/
public void doStop() throws Exception {
if (this.heartBeatTimer != null) {
- // Make sure we shutdown the timer before shutting down the down stream
+ // Make sure we shutdown the timer before shutting down the down
+ // stream
// processors to avoid the timer getting errors.
final CountDownLatch done = new CountDownLatch(1);
- this.heartBeatTimer.schedule(new TimerTask(){
+ this.heartBeatTimer.schedule(new TimerTask() {
@Override
public void run() {
- heartBeatTimer.cancel();
+ Group.this.heartBeatTimer.cancel();
done.countDown();
- }}, 0);
+ }
+ }, 0);
done.await();
this.heartBeatTimer = null;
}
@@ -273,34 +251,32 @@ public class Group extends BaseService {
}
}
+ /**
+ * @return the String representation
+ * @see java.lang.Object#toString()
+ */
public String toString() {
return "Group " + getLocalMember().getName();
}
/**
* Process a new member
- *
- * @param data
- * @throws Exception
+ *
* @return Member if a new member else null
*/
- protected final MemberImpl processMember(MemberDataBuffer data) throws Exception {
+ protected final MemberImpl processMember(MemberImpl member) throws Exception {
MemberImpl result = null;
MemberImpl old = null;
- MemberImpl member = new MemberImpl(data);
if (!member.getId().equals(getLocalMember().getId()) && isInOurGroup(member)) {
member.setTimeStamp(System.currentTimeMillis());
if ((old = this.members.put(member.getId(), member)) == null) {
processMemberStarted(member);
if (!member.getId().equals(this.channel.getId())) {
- this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel
- .getLocalMember().getData().freeze());
+ this.channel.sendManagementMessage(member.getAddress(), this.channel.getLocalMember());
}
result = member;
} else {
- if (data.getSubscriptionsChanged()) {
- processMemberUpdate(old, member);
- }
+ processMemberUpdate(old, member);
}
}
return result;
@@ -348,7 +324,7 @@ public class Group extends BaseService {
}
protected void processMemberStarted(MemberImpl member) throws Exception {
- processDestinationsForStarted(member);
+ processDestinationsForStarted(member, member.getSubscriptions());
fireMemberStarted(member);
synchronized (this.members) {
this.members.notifyAll();
@@ -357,58 +333,68 @@ public class Group extends BaseService {
protected void processMemberStopped(MemberImpl member) throws Exception {
fireMemberStopped(member);
- processDestinationsForStopped(member);
+ processDestinationsForStopped(member, member.getSubscriptions());
}
- private void processDestinationsForStarted(MemberImpl member) {
- List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
- if( subscriptionList == null ) {
+ private void processDestinationsForStarted(MemberImpl member, List<Subscription> subscriptionList) {
+ if (subscriptionList == null) {
return;
}
- for (SubscriptionData subData : subscriptionList) {
+ for (Subscription sub : subscriptionList) {
Map<Subscription, List<MemberImpl>> map = null;
- if (subData.getDestinationData().getTopic()) {
+ if (sub.getDestination().isTopic()) {
map = this.topicMap;
} else {
map = this.queueMap;
}
- Subscription key = new Subscription(subData.copy());
- List<MemberImpl> members = map.get(key);
- if (members == null) {
- members = new CopyOnWriteArrayList<MemberImpl>();
- map.put(key, members);
+ List<MemberImpl> membersList = map.get(sub);
+ if (membersList == null) {
+ membersList = new CopyOnWriteArrayList<MemberImpl>();
+ map.put(sub, membersList);
}
- members.add(member);
+ membersList.add(member);
}
}
- private void processDestinationsForStopped(MemberImpl member) {
- List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
- if( subscriptionList == null ) {
+ private void processDestinationsForStopped(MemberImpl member, List<Subscription> subscriptionList) {
+ if (subscriptionList == null) {
return;
}
- for (SubscriptionData subData : subscriptionList) {
+ for (Subscription sub : subscriptionList) {
Map<Subscription, List<MemberImpl>> map = null;
- if (subData.getDestinationData().getTopic()) {
+ if (sub.getDestination().isTopic()) {
map = this.topicMap;
} else {
map = this.queueMap;
}
- Subscription key = new Subscription(subData.copy());
- List<MemberImpl> members = map.get(key);
- if (members != null) {
- members.remove(member);
- if (members.isEmpty()) {
- map.remove(key);
+ List<MemberImpl> membersList = map.get(sub);
+ if (membersList != null) {
+ membersList.remove(member);
+ if (membersList.isEmpty()) {
+ map.remove(sub);
}
}
}
}
protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception {
- //check for deltas
- processDestinationsForStopped(oldMember);
- processDestinationsForStarted(newMember);
+ // check for deltas
+ List<Subscription> oldList = oldMember.getSubscriptions();
+ List<Subscription> newList = newMember.getSubscriptions();
+ List<Subscription> removed = new ArrayList<Subscription>();
+ for (Subscription s : oldList) {
+ if (!newList.contains(s)) {
+ removed.add(s);
+ }
+ }
+ processDestinationsForStopped(oldMember, removed);
+ List<Subscription> added = new ArrayList<Subscription>();
+ for (Subscription s : newList) {
+ if (!oldList.contains(s)) {
+ added.add(s);
+ }
+ }
+ processDestinationsForStarted(newMember, added);
}
/**
@@ -427,7 +413,7 @@ public class Group extends BaseService {
protected void broadcastHeartBeat(MemberImpl local) throws Exception {
if (isStarted()) {
- Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
+ Group.this.channel.broadcastManagementMessage(local);
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Fri Jul 19 18:44:21 2013
@@ -20,12 +20,11 @@ import java.util.List;
/**
- *A <CODE>Member</CODE> holds information about a member of the group
- *A Member has to be added to a group to interact with it
- *
+ * A <CODE>Member</CODE> holds information about a member of the group
+ * A Member has to be added to a group to interact with it
*/
public interface Member {
-
+
/**
* @return the name
*/
@@ -35,48 +34,51 @@ public interface Member {
* @return the id
*/
public String getId();
-
-
+
+
/**
* @return the startTime
*/
public long getStartTime();
-
+
/**
* @return the timeStamp
*/
long getTimeStamp();
-
+
/**
* Set the timestamp
- * @param value
*/
void setTimeStamp(long value);
+
/**
* @return the inbox destination
*/
public String getInBoxDestination();
-
-
+
+
/**
* This weight can be used to help select a master
* in the cluster - the highest weight becomes the master
+ *
* @return the masterWeight
*/
public long getMasterWeight();
-
+
/**
- * If there is two members have the same master weight,
+ * If there is two members have the same master weight,
* a secondary weight can be used
+ *
* @return refined weight
*/
public long getRefinedMasterWeight();
-
-
+
+
/**
* Get an array of groups
+ *
* @return an array of groups
*/
public List<String> getGroups();
-
+
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java Fri Jul 19 18:44:21 2013
@@ -20,19 +20,16 @@ package org.apache.activeblaze.group;
/**
* A listener for membership changes to a group
- *
*/
public interface MemberChangedListener {
-
+
/**
* Notification a member has started
- * @param member
*/
void memberStarted(Member member);
-
+
/**
* Notification a member has stopped
- * @param member
*/
void memberStopped(Member member);
}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html Fri Jul 19 18:44:21 2013
@@ -19,7 +19,7 @@
</head>
<body>
-Group channel for communicating true point-to-point using unicast and
+Group channel for communicating true point-to-point using unicast and
Group membership
</body>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java Fri Jul 19 18:44:21 2013
@@ -16,55 +16,48 @@
*/
package org.apache.activeblaze.impl.destination;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.UTF8Buffer;
+import org.apache.activeblaze.wire.Buffer;
+
/**
* Matches a Destination Subject to wildcards
- *
*/
public final class DestinationMatch {
static final byte MATCH_ELEMENT = '*';
static final byte MATCH_ALL = '>';
- static final byte[] DELIMETERS = { '.', '/', '|' };
+ //static final byte[] DELIMETERS = { '.', '/', '|' };
+ static final byte DELIMETER = '.';
/**
* See if the destination matches with wild cards
- *
- * @param destination
- * @param match
+ *
* @return true if its a match
*/
public static boolean isMatch(String destination, String match) {
- return isMatch(new UTF8Buffer(destination), new UTF8Buffer(match));
+ return isMatch(new Buffer(destination), new Buffer(match));
}
+
/**
* See if the destination matches with wild cards
- *
- * @param destination
- * @param match
+ *
* @return true if its a match
*/
public static boolean isMatch(Buffer destination, String match) {
- return isMatch(destination, new UTF8Buffer(match));
+ return isMatch(destination, new Buffer(match));
}
-
+
/**
* See if the destination matches with wild cards
- *
- * @param destination
- * @param match
+ *
* @return true if its a match
*/
public static boolean isMatch(String destination, Buffer match) {
- return isMatch(new UTF8Buffer(destination), match);
+ return isMatch(new Buffer(destination), match);
}
/**
* See if the destination matches with wild cards
- *
- * @param destination
- * @param match
+ *
* @return true if its a match
*/
public static boolean isMatch(Buffer destination, Buffer match) {
@@ -80,8 +73,8 @@ public final class DestinationMatch {
int matchOffset = 0;
while (destinationOffset < destination.length
&& matchOffset < match.length) {
- byte matchByte = match.byteAt(matchOffset);
- byte destinationByte = destination.byteAt(destinationOffset);
+ byte matchByte = match.data[match.offset + matchOffset];
+ byte destinationByte = destination.data[+destination.offset + destinationOffset];
if (matchByte != destinationByte || matchByte == MATCH_ALL || destinationByte == MATCH_ALL
|| matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) {
if (matchByte == MATCH_ALL || destinationByte == MATCH_ALL) {
@@ -89,7 +82,7 @@ public final class DestinationMatch {
break;
} else if ((matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT)
&& (isMatchElement(match, matchOffset, match.length) || isMatchElement(destination,
- destinationOffset, destination.length))) {
+ destinationOffset, destination.length))) {
if (containsDelimeter(destination, destinationOffset, destination.length) == false
&& containsDelimeter(match, matchOffset, match.length) == false) {
break;
@@ -120,12 +113,12 @@ public final class DestinationMatch {
if (str != null && offset < str.length) {
byte offByte = str.byteAt(offset);
byte offBytePlusOne = (byte) ((offset + 1) < str.length ? str.byteAt(offset + 1) : ' ');
- if (offset + 1 < str.length && isDelimiter(offByte)) {
+ if (offset + 1 < str.length && DELIMETER == offByte) {
if (offBytePlusOne == MATCH_ALL) {
result = true;
}
} else if ((offByte == MATCH_ALL || offByte == MATCH_ELEMENT)
- && (isWhiteSpace(str, offset + 1, count) || isDelimiter(offBytePlusOne) || offset + 1 == str.length)) {
+ && (isWhiteSpace(str, offset + 1, count) || DELIMETER == offBytePlusOne || offset + 1 == str.length)) {
result = true;
}
}
@@ -135,8 +128,8 @@ public final class DestinationMatch {
static boolean isMatchElement(Buffer str, int offset, int count) {
boolean result = false;
if (str.byteAt(offset) == MATCH_ELEMENT) {
- if (offset == 0 || isDelimiter(str.byteAt(offset - 1))) {
- result = ((offset + 1) >= str.length) || isDelimiter(str.byteAt(offset + 1))
+ if (offset == 0 || str.byteAt(offset - 1) == DELIMETER) {
+ result = ((offset + 1) >= str.length) || str.byteAt(offset + 1) == DELIMETER
|| isWhiteSpace(str, offset + 1, count);
}
}
@@ -156,7 +149,7 @@ public final class DestinationMatch {
static int offsetToNextToken(Buffer str, int offset, int len) {
while (offset < len) {
- if (isDelimiter(str.byteAt(offset)))
+ if (str.byteAt(offset) == DELIMETER)
break;
offset++;
}
@@ -167,10 +160,10 @@ public final class DestinationMatch {
int result = -1;
int count = offset;
while (count < len) {
- if (isDelimiter(str.byteAt(count))) {
+ if (str.byteAt(count) == DELIMETER) {
result = ++count;
// check for double placed elements ...
- while (count < len && isDelimiter(str.byteAt(count))) {
+ while (count < len && str.byteAt(count) == DELIMETER) {
result = ++count;
}
break;
@@ -183,7 +176,7 @@ public final class DestinationMatch {
private static boolean containsDelimeter(Buffer str, int offset, int len) {
boolean result = false;
while (offset < len) {
- if (isDelimiter(str.byteAt(offset++))) {
+ if (str.byteAt(offset++) == DELIMETER) {
result = true;
break;
}
@@ -191,12 +184,4 @@ public final class DestinationMatch {
return result;
}
- private static final boolean isDelimiter(byte b) {
- for (int i = 0; i < DELIMETERS.length; i++) {
- if (b == DELIMETERS[i]) {
- return true;
- }
- }
- return false;
- }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java Fri Jul 19 18:44:21 2013
@@ -16,6 +16,9 @@
*/
package org.apache.activeblaze.impl.network;
+import java.net.InetSocketAddress;
+import java.net.URI;
+
import org.apache.activeblaze.ExceptionListener;
import org.apache.activeblaze.Processor;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -26,12 +29,8 @@ import org.apache.activeblaze.impl.trans
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.net.InetSocketAddress;
-import java.net.URI;
-
/**
* Base class for network broadcast protocols
- *
*/
public abstract class BaseNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
protected static final Log LOG = LogFactory.getLog(BaseNetwork.class);
@@ -52,16 +51,13 @@ public abstract class BaseNetwork extend
}
/**
- * @param name
- * the name to set
+ * @param name the name to set
*/
public void setName(String name) {
this.name = name;
}
/**
- * @param uri
- * @throws Exception
* @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
*/
public void setManagementURI(URI uri) throws Exception {
@@ -69,8 +65,6 @@ public abstract class BaseNetwork extend
}
/**
- * @param uri
- * @throws Exception
* @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
*/
public void setURI(URI uri) throws Exception {
@@ -86,7 +80,6 @@ public abstract class BaseNetwork extend
}
/**
- * @param reliability
* @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
*/
public void setReliability(String reliability) {
@@ -95,8 +88,7 @@ public abstract class BaseNetwork extend
/**
* initialize the network
- *
- * @throws Exception
+ *
* @see org.apache.activeblaze.Service#init()
*/
public void doInit() throws Exception {
@@ -132,8 +124,6 @@ public abstract class BaseNetwork extend
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#shutDown()
*/
public void doShutDown() throws Exception {
@@ -147,8 +137,6 @@ public abstract class BaseNetwork extend
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#start()
*/
public void doStart() throws Exception {
@@ -162,8 +150,6 @@ public abstract class BaseNetwork extend
}
/**
- *
- * @throws Exception
* @see org.apache.activeblaze.Service#stop()
*/
public void doStop() throws Exception {
@@ -176,10 +162,8 @@ public abstract class BaseNetwork extend
}
}
-
/**
- * @param l
* @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
*/
public void setExceptionListener(ExceptionListener l) {
@@ -187,7 +171,6 @@ public abstract class BaseNetwork extend
}
/**
- * @param ex
* @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
*/
public void onException(Exception ex) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
package org.apache.activeblaze.impl.network;
import java.net.InetSocketAddress;
-import org.apache.activeblaze.impl.processor.Packet;
+
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.wire.Packet;
/**
* Uses multicast to implement a Network
- *
*/
public class MulticastNetwork extends BaseNetwork {
/**
@@ -32,19 +32,17 @@ public class MulticastNetwork extends Ba
public MulticastNetwork() {
this.reliability = "simple";
}
-
+
public void doInit() throws Exception {
super.doInit();
- this.broadcastAddress = new InetSocketAddress(this.broadcastURI.getHost(),this.broadcastURI.getPort());
- if (this.managementURI!=null) {
- this.managementAddress = new InetSocketAddress(this.managementURI.getHost(),this.managementURI.getPort());
+ this.broadcastAddress = new InetSocketAddress(this.broadcastURI.getHost(), this.broadcastURI.getPort());
+ if (this.managementURI != null) {
+ this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort());
}
}
/**
- * @param packet
- * @throws Exception
- * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+ * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.wire.Packet)
*/
public void downStreamManagement(Packet packet) throws Exception {
if (this.management != null) {
@@ -56,9 +54,7 @@ public class MulticastNetwork extends Ba
}
/**
- * @param packet
- * @throws Exception
- * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+ * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.wire.Packet)
*/
public void downStream(Packet packet) throws Exception {
packet.setTo(this.broadcastAddress);
@@ -66,8 +62,6 @@ public class MulticastNetwork extends Ba
}
/**
- * @return
- * @throws Exception
* @see org.apache.activeblaze.impl.network.BaseNetwork#createManagementTransport()
*/
@Override
@@ -77,8 +71,6 @@ public class MulticastNetwork extends Ba
}
/**
- * @return
- * @throws Exception
* @see org.apache.activeblaze.impl.network.BaseNetwork#createTransport()
*/
@Override
@@ -88,7 +80,6 @@ public class MulticastNetwork extends Ba
}
/**
- * @return
* @see org.apache.activeblaze.impl.network.BaseNetwork#doCreateManagement()
*/
@Override
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java Fri Jul 19 18:44:21 2013
@@ -15,54 +15,47 @@
* limitations under the License.
*/
package org.apache.activeblaze.impl.network;
+
import java.net.URI;
+
import org.apache.activeblaze.impl.processor.ChainedProcessor;
/**
- * <P>
+ * <p/>
* A <CODE>Network</CODE> defines operations that can be applied to remote
* channel instances
- *
*/
-public interface Network extends ChainedProcessor{
-
+public interface Network extends ChainedProcessor {
+
/**
* @return the name
*/
- public String getName() ;
+ public String getName();
+
/**
* @param name the name to set
*/
public void setName(String name);
-
+
/**
* Set the uri for the <Code>Network</Code> to use
- * @param uri
- * @throws Exception
*/
public void setURI(URI uri) throws Exception;
-
+
/**
* Set the uri for the <Code>Network</Code> to use for management
- * @param uri
- * @throws Exception
*/
public void setManagementURI(URI uri) throws Exception;
-
+
/**
* Set the reliable protocol to use for this network
- * @param reliability
*/
public void setReliability(String reliability);
-
+
/**
* @return the reliability protocol used for this network
*/
public String getReliability();
-
-
-
-
-
+
}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java Fri Jul 19 18:44:21 2013
@@ -18,25 +18,21 @@ package org.apache.activeblaze.impl.netw
import java.net.URI;
import java.util.Map;
+
import org.apache.activeblaze.util.ObjectFinder;
import org.apache.activeblaze.util.PropertyUtil;
/**
* create a new Network instance
- *
*/
public abstract class NetworkFactory {
private static final ObjectFinder OBJECT_FINDER = new ObjectFinder(
"META-INF/services/org/apache/activeblaze/network/");
/**
- * @param broadcast
- * @param management
- * @param reliability
* @return the network associated with the URI
- * @throws Exception
*/
- public static Network get(URI broadcast, URI management,String reliability) throws Exception {
+ public static Network get(URI broadcast, URI management, String reliability) throws Exception {
Network result = findNetwork(broadcast);
result.setURI(broadcast);
result.setReliability(reliability);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Fri Jul 19 18:44:21 2013
@@ -16,20 +16,21 @@
*/
package org.apache.activeblaze.impl.network;
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.transport.BaseTransport;
-import org.apache.activeblaze.impl.transport.TransportFactory;
-import org.apache.activeblaze.util.URISupport;
-import org.apache.activeblaze.util.URISupport.CompositeData;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.URISupport;
+import org.apache.activeblaze.util.URISupport.CompositeData;
+import org.apache.activeblaze.wire.Packet;
+
/**
* Uses a list of URI's to create a network
- *
*/
public class StaticNetwork extends BaseNetwork {
private InetSocketAddress localAddress;
@@ -48,8 +49,6 @@ public class StaticNetwork extends BaseN
}
/**
- * @param location
- * @throws Exception
* @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
*/
public void setManagementURI(URI location) throws Exception {
@@ -70,8 +69,6 @@ public class StaticNetwork extends BaseN
}
/**
- * @param location
- * @throws Exception
* @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
*/
public void setURI(URI location) throws Exception {
@@ -91,14 +88,12 @@ public class StaticNetwork extends BaseN
}
/**
- *
+ *
* @throws Exception
* @see org.apache.activeblaze.Service#init()
*/
/**
- * @param packet
- * @throws Exception
- * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+ * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.wire.Packet)
*/
public void downStreamManagement(Packet packet) throws Exception {
if (this.management != null) {
@@ -117,9 +112,7 @@ public class StaticNetwork extends BaseN
}
/**
- * @param packet
- * @throws Exception
- * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+ * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.wire.Packet)
*/
public void downStream(Packet packet) throws Exception {
for (InetSocketAddress address : this.broadcastAddresses) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html Fri Jul 19 18:44:21 2013
@@ -1,25 +1,25 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
</head>
<body>
-A Transport that represents all reachable nodes.
+A Transport that represents all reachable nodes.
A <Code>Network</Code>Can be a multicast address, defined by a list or urls
or use a central location service to determine where to locate channels
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -21,7 +21,6 @@ import org.apache.activeblaze.Processor;
/**
* Chains Processors together
- *
*/
public interface ChainedProcessor extends Processor {
/**
@@ -31,16 +30,11 @@ public interface ChainedProcessor extend
/**
* Set Next at the end of the chain
- *
- * @param next
- *
*/
public abstract void setEnd(Processor next);
/**
* Set the next
- *
- * @param next
*/
public abstract void setNext(Processor next);
@@ -51,14 +45,11 @@ public interface ChainedProcessor extend
/**
* Set the next chain
- *
- * @param p
*/
public abstract void setNextChain(ChainedProcessor p);
/**
- * @param prev
- * the prev to set
+ * @param prev the prev to set
*/
public abstract void setPrev(Processor prev);
@@ -68,8 +59,7 @@ public interface ChainedProcessor extend
public ExceptionListener getExceptionListener();
/**
- * @param maxPacketSize
- * the maxPacketSize to set
+ * @param maxPacketSize the maxPacketSize to set
*/
public void setMaxPacketSize(int maxPacketSize);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -21,12 +21,12 @@ import org.apache.activeblaze.BlazeConfi
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.ExceptionListener;
import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.wire.Packet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Default implementation of a ChainedProcessor
- *
*/
public class DefaultChainedProcessor extends BaseService implements ChainedProcessor {
private static final Log LOG = LogFactory.getLog(DefaultChainedProcessor.class);
@@ -47,7 +47,6 @@ public class DefaultChainedProcessor ext
}
/**
- * @param next
* @see org.apache.activeblaze.impl.processor.ChainedProcessor#setEnd(org.apache.activeblaze.Processor)
*/
public void setEnd(Processor next) {
@@ -68,7 +67,6 @@ public class DefaultChainedProcessor ext
}
/**
- * @param next
* @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNext(org.apache.activeblaze.Processor)
*/
public void setNext(Processor next) {
@@ -84,7 +82,6 @@ public class DefaultChainedProcessor ext
}
/**
- * @param p
* @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNextChain(org.apache.activeblaze.impl.processor.ChainedProcessor)
*/
public void setNextChain(ChainedProcessor p) {
@@ -105,7 +102,6 @@ public class DefaultChainedProcessor ext
}
/**
- * @param prev
* @see org.apache.activeblaze.impl.processor.ChainedProcessor#setPrev(org.apache.activeblaze.Processor)
*/
public void setPrev(Processor prev) {
@@ -138,9 +134,6 @@ public class DefaultChainedProcessor ext
/**
* Send a management packet - this may be on a different address
- *
- * @param packet
- * @throws Exception
*/
public void downStreamManagement(Packet packet) throws Exception {
if (this.next != null) {
@@ -212,8 +205,7 @@ public class DefaultChainedProcessor ext
}
/**
- * @param maxPacketSize
- * the maxPacketSize to set
+ * @param maxPacketSize the maxPacketSize to set
*/
public void setMaxPacketSize(int maxPacketSize) {
this.maxPacketSize = maxPacketSize;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>