You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [5/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ s...
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java Fri Jul 19 18:44:21 2013
@@ -16,28 +16,28 @@
*/
package org.apache.activeblaze.impl.reliable;
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
/**
* @author rajdavies
- *
*/
class ReceivedPacket extends ReliablePacket {
private final long timestamp;
+
/**
* Constructor
- * @param packet
*/
ReceivedPacket(Packet packet) {
super(packet);
- this.timestamp=System.currentTimeMillis();
+ this.timestamp = System.currentTimeMillis();
}
+
/**
* @return the timestamp
*/
public long getTimestamp() {
return this.timestamp;
}
-
+
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java Fri Jul 19 18:44:21 2013
@@ -20,11 +20,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.activeblaze.impl.processor.Packet;
+
+import org.apache.activeblaze.wire.Packet;
/**
* Holds a buffer of Packets to replay
- *
*/
public class ReliableBuffer {
Map<String, ReliablePacket> idMap = new HashMap<String, ReliablePacket>();
@@ -56,14 +56,12 @@ public class ReliableBuffer {
/**
* Add a Packet
- *
- * @param p
*/
public synchronized void addPacket(Packet p) {
ReliablePacket reliablePacket = new ReliablePacket(p);
if (this.idMap.put(p.getId(), reliablePacket) == null) {
this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
- this.bufferSize += p.getPacketData().serializedSizeFramed();
+ this.bufferSize++;
if (this.root == null) {
this.root = reliablePacket;
} else {
@@ -75,14 +73,12 @@ public class ReliableBuffer {
/**
* Add packet in order
- *
- * @param p
*/
public synchronized void addPacketInOrder(Packet p) {
ReliablePacket reliablePacket = new ReliablePacket(p);
if (this.idMap.put(p.getId(), reliablePacket) == null) {
this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
- this.bufferSize += p.getPacketData().serializedSizeFramed();
+ this.bufferSize++;
if (this.root == null) {
this.root = reliablePacket;
this.tail = reliablePacket;
@@ -141,8 +137,7 @@ public class ReliableBuffer {
/**
* Get a Packet from the buffer
- *
- * @param id
+ *
* @return the Packet
*/
public synchronized Packet getPacket(String id) {
@@ -152,8 +147,7 @@ public class ReliableBuffer {
/**
* Get a Packet from the buffer
- *
- * @param id
+ *
* @return the Packet
*/
public synchronized Packet getPacket(long id) {
@@ -163,8 +157,7 @@ public class ReliableBuffer {
/**
* Get the next Packet form the buffer
- *
- * @param p
+ *
* @return the next Packet from the buffer
*/
public synchronized Packet getNext(Packet p) {
@@ -174,9 +167,7 @@ public class ReliableBuffer {
/**
* Get a list of Packetd from the buffer
- *
- * @param start
- * @param end
+ *
* @return the list of type <Code>Packet</Code>
*/
public synchronized List<Packet> getPackets(long start, long end) {
@@ -202,8 +193,6 @@ public class ReliableBuffer {
/**
* Remove a packet from the buffer
- *
- * @param p
*/
public synchronized void removePacket(Packet p) {
removePacket(p.getId());
@@ -211,9 +200,9 @@ public class ReliableBuffer {
/**
* Remove a packet from the buffer
- *
+ *
* @param id -
- * the id of the Packet
+ * the id of the Packet
*/
public synchronized void removePacket(String id) {
ReliablePacket reliablePacket = this.idMap.remove(id);
@@ -222,8 +211,6 @@ public class ReliableBuffer {
/**
* Remove a Packet from the buffer
- *
- * @param sequenceNumber
*/
public synchronized void removePacket(long sequenceNumber) {
ReliablePacket reliablePacket = this.sequenceMap.remove(new Long(sequenceNumber));
@@ -232,9 +219,6 @@ public class ReliableBuffer {
/**
* Remove Packets from the buffer
- *
- * @param start
- * @param end
*/
public synchronized void removePackets(long start, long end) {
ReliablePacket packet = this.root;
@@ -281,7 +265,7 @@ public class ReliableBuffer {
this.tail = (ReliablePacket) reliablePacket.getPrevious();
}
reliablePacket.unlink();
- this.bufferSize -= reliablePacket.getPacket().getPacketData().serializedSizeFramed();
+ this.bufferSize--;
}
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java Fri Jul 19 18:44:21 2013
@@ -17,6 +17,7 @@
package org.apache.activeblaze.impl.reliable;
import java.util.Map;
+
import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.util.ObjectFinder;
@@ -24,34 +25,31 @@ import org.apache.activeblaze.util.Prope
/**
* Find a reliable implementation
- *
*/
public class ReliableFactory {
-
+
private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/reliable/");
/**
- * @param location
* @return the configured transport from its URI
- * @throws Exception
*/
public static DefaultChainedProcessor get(String location) throws Exception {
- DefaultChainedProcessor result = findReliable(location);
+ DefaultChainedProcessor result = findReliable(location);
configure(result, location);
return result;
}
-
+
static void configure(ChainedProcessor transport, String location) throws Exception {
Map<String, String> options = PropertyUtil.parseParameters(location);
PropertyUtil.setProperties(transport, options);
}
-
+
private static DefaultChainedProcessor findReliable(String location) throws Exception {
- String scheme = PropertyUtil.stripBefore(location, '?');
- if (scheme == null) {
- throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
- }
- DefaultChainedProcessor result = (DefaultChainedProcessor) OBJECT_FINDER.newInstance(scheme);
- return result;
+ String scheme = PropertyUtil.stripBefore(location, '?');
+ if (scheme == null) {
+ throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
+ }
+ DefaultChainedProcessor result = (DefaultChainedProcessor) OBJECT_FINDER.newInstance(scheme);
+ return result;
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java Fri Jul 19 18:44:21 2013
@@ -16,12 +16,11 @@
*/
package org.apache.activeblaze.impl.reliable;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.util.LinkedNode;
+import org.apache.activeblaze.wire.Packet;
/**
* Wrapper for a Packet
- *
*/
class ReliablePacket extends LinkedNode {
private final Packet packet;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
package org.apache.activeblaze.impl.reliable.simple;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
+
/**
* Very basic (none) reliability
- *
*/
public class SimpleReliableProcessor extends DefaultChainedProcessor {
- int maxWindowSize = 64 * 1024;
+ int maxWindowSize = 1024;
int windowSize = 0;
int pauseTime = 0;
@@ -34,12 +34,10 @@ public class SimpleReliableProcessor ext
}
/**
- * @param p
- * @throws Exception
- * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.impl.processor.Packet)
+ * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.wire.Packet)
*/
public void downStream(Packet p) throws Exception {
- this.windowSize += p.getPacketData().serializedSizeFramed();
+ this.windowSize++;
if (this.windowSize >= this.maxWindowSize) {
Thread.sleep(this.pauseTime);
this.windowSize = 0;
@@ -55,8 +53,7 @@ public class SimpleReliableProcessor ext
}
/**
- * @param maxWindowSize
- * the maxWindowSize to set
+ * @param maxWindowSize the maxWindowSize to set
*/
public void setMaxWindowSize(int maxWindowSize) {
this.maxWindowSize = maxWindowSize;
@@ -70,8 +67,7 @@ public class SimpleReliableProcessor ext
}
/**
- * @param pauseTime
- * the pauseTime to set
+ * @param pauseTime the pauseTime to set
*/
public void setPauseTime(int pauseTime) {
this.pauseTime = pauseTime;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Fri Jul 19 18:44:21 2013
@@ -22,24 +22,21 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBean;
-import org.apache.activeblaze.wire.NackData.NackDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Nack;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Consumer part of SWP
- *
*/
public class ConsumerProcessor {
static final Log LOG = LogFactory.getLog(ConsumerProcessor.class);
private static final long NOT_SET = -1l;
- private int maxWindowSize = 16 * 1024;
+ private int maxWindowSize = 64;
private int rtt = 1000;
private long lastAckTime;
private long firstSequence = NOT_SET;
@@ -58,9 +55,8 @@ public class ConsumerProcessor {
}
void processInBound(Packet packet) throws Exception {
- PacketData packetData = packet.getPacketData();
- MessageType type = packetData.getMessageType();
- if (type == MessageType.CONTROL_DATA) {
+ int type = packet.getPacketType();
+ if (type == PacketType.CONTROL.getNumber()) {
if (this.replayBuffer.isEmpty()) {
// send back a control message
}
@@ -78,7 +74,7 @@ public class ConsumerProcessor {
this.firstSequence = sequence;
}
this.lastSequence = sequence;
- this.bufferSize += packet.getPacketData().serializedSizeFramed();
+ this.bufferSize++;
} finally {
this.lock.unlock();
}
@@ -109,22 +105,17 @@ public class ConsumerProcessor {
}
} else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) {
// request the sequence
- NackDataBean nack = new NackDataBean();
+ Nack nack = new Nack();
this.lock.lock();
try {
nack.setStartSequence(this.lastSequence + 1);
nack.setEndSequence(packet.getMessageSequence() - 1);
- nack.setSessionId(packet.getPacketData().getSessionId());
- nack.setId(this.ackSequence.incrementAndGet());
- PacketDataBean pd = new PacketDataBean();
- pd.setResponseRequired(false);
- pd.setPayload(nack.freeze().toUnframedBuffer());
- pd.setMessageType(MessageType.NACK_DATA);
- Packet nackPacket = new Packet(pd.freeze());
- nackPacket.setTo(this.peerAddress);
- this.swp.sendDownStream(nackPacket);
+ nack.setMessageSequence(this.ackSequence.incrementAndGet());
+ nack.setResponseRequired(false);
+ nack.setTo(this.peerAddress);
+ this.swp.sendDownStream(nack);
LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
- this.lastAck = nackPacket;
+ this.lastAck = nack;
} finally {
this.lock.unlock();
}
@@ -135,10 +126,7 @@ public class ConsumerProcessor {
}
/**
- *
- * @param timeStamp
* @return if still valid
- * @throws Exception
*/
boolean control(long timeStamp) throws Exception {
boolean result = false;
@@ -153,18 +141,15 @@ public class ConsumerProcessor {
this.lock.lock();
try {
this.bufferSize = 0;
- AckDataBean ack = new AckDataBean();
+ Ack ack = new Ack();
ack.setStartSequence(this.firstSequence);
ack.setEndSequence(this.lastSequence);
- ack.setId(this.ackSequence.incrementAndGet());
- PacketDataBean pd = new PacketDataBean();
- pd.setResponseRequired(false);
- pd.setPayload(ack.freeze().toUnframedBuffer());
- pd.setMessageType(MessageType.ACK_DATA);
- ackPacket = new Packet(pd.freeze());
- ackPacket.setTo(this.peerAddress);
+ ack.setMessageSequence(this.ackSequence.incrementAndGet());
+ ack.setResponseRequired(false);
+ ack.setTo(this.peerAddress);
this.lastAckTime = System.currentTimeMillis();
this.firstSequence = this.lastSequence;
+ ackPacket = ack;
LOG.debug(this + " Sent Ack " + ack);
} finally {
this.lock.unlock();
@@ -173,6 +158,10 @@ public class ConsumerProcessor {
this.lastAck = ackPacket;
}
+ /**
+ * @return String
+ * @see java.lang.Object#toString()
+ */
public String toString() {
return "ConsumerProcessor(" + this.peerAddress + ")";
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Fri Jul 19 18:44:21 2013
@@ -25,53 +25,42 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activeblaze.BlazeNoRouteException;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBuffer;
-import org.apache.activeblaze.wire.ControlData.ControlDataBean;
-import org.apache.activeblaze.wire.NackData.NackDataBuffer;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Control;
+import org.apache.activeblaze.wire.Nack;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* state on a request
- *
*/
public class ProducerProcessor {
static final Log LOG = LogFactory.getLog(ProducerProcessor.class);
private static final long NOT_SET = -1l;
- private int maxWindowSize = 32 * 1024;
+ private int maxWindowSize = 1024;
private int rtt = 5000;
private ReliableBuffer replayBuffer = new ReliableBuffer();
private final Lock lock = new ReentrantLock();
private final Condition full = this.lock.newCondition();
private final SwpProcessor swp;
private final SocketAddress peerAddress;
- private final int sessionId;
private long lastAckId = NOT_SET;
private long lastAckTime = NOT_SET;
private AtomicLong sendSequence = new AtomicLong(1);
- ProducerProcessor(SwpProcessor swp, SocketAddress peerAddress, int sessionId) {
+ ProducerProcessor(SwpProcessor swp, SocketAddress peerAddress) {
this.swp = swp;
this.peerAddress = peerAddress;
- this.sessionId = sessionId;
}
/**
* blocks until it can send the packet
- *
- * @param packet
- * @throws BlazeNoRouteException
- *
*/
void processOutbound(final Packet packet) throws BlazeNoRouteException {
- PacketDataBean bean = packet.getPacketData().copy();
- bean.setSessionId(this.sessionId);
- bean.setMessageSequence(this.sendSequence.incrementAndGet());
- packet.setPacketData(bean.freeze());
+ packet.setMessageSequence(this.sendSequence.incrementAndGet());
this.lock.lock();
try {
this.replayBuffer.addPacket(packet);
@@ -82,11 +71,11 @@ public class ProducerProcessor {
if (windowSize >= this.maxWindowSize) {
if (!this.full.await(this.rtt, TimeUnit.MILLISECONDS)) {
this.replayBuffer.clear();
- throw new BlazeNoRouteException("No route to "+packet.getTo());
+ throw new BlazeNoRouteException("No route to " + packet.getTo());
}
}
} catch (InterruptedException e) {
- //ignore - we are shutting down
+ // ignore - we are shutting down
} finally {
this.lock.unlock();
}
@@ -94,23 +83,23 @@ public class ProducerProcessor {
Packet processInbound(Packet packet) throws Exception {
Packet result = null;
- PacketData data = packet.getPacketData();
- if (data != null) {
- MessageType type = data.getMessageType();
- if (type == MessageType.ACK_DATA) {
- AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
- long start = ackData.getStartSequence();
- long end = ackData.getEndSequence();
+
+ if (packet != null) {
+ int type = packet.getPacketType();
+ if (type == PacketType.ACK.getNumber()) {
+ Ack ack = (Ack) packet;
+ long start = ack.getStartSequence();
+ long end = ack.getEndSequence();
if (LOG.isDebugEnabled()) {
- LOG.debug(this + " Got Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
- + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+ LOG.debug(this + " Got Ack = " + ack.getId() + ": " + ack.getStartSequence() + ","
+ + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
}
this.replayBuffer.removePackets(start, end);
if (LOG.isDebugEnabled()) {
- LOG.debug(this + " Processed Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
- + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+ LOG.debug(this + " Processed Ack = " + ack.getId() + ": " + ack.getStartSequence() + ","
+ + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
}
- this.lastAckId = ackData.getId();
+ this.lastAckId = ack.getMessageSequence();
this.lastAckTime = System.currentTimeMillis();
if (this.replayBuffer.getBufferSize() <= this.maxWindowSize) {
this.lock.lock();
@@ -120,14 +109,14 @@ public class ProducerProcessor {
this.lock.unlock();
}
}
- } else if (type == MessageType.NACK_DATA) {
+ } else if (type == PacketType.NACK.getNumber()) {
this.lastAckTime = System.currentTimeMillis();
- NackDataBuffer nackData = NackDataBuffer.parseUnframed(data.getPayload());
- this.lastAckId = nackData.getId();
- LOG.debug(this + " Got Nack = " + nackData);
+ Nack nack = (Nack) packet;
+ this.lastAckId = nack.getMessageSequence();
+ LOG.debug(this + " Got Nack = " + nack);
// lookup any missed messages
- long start = nackData.getStartSequence();
- long end = nackData.getEndSequence();
+ long start = nack.getStartSequence();
+ long end = nack.getEndSequence();
List<Packet> list = this.replayBuffer.getPackets(start, end);
LOG.debug(this + " Replaying " + list);
for (Packet p : list) {
@@ -141,31 +130,24 @@ public class ProducerProcessor {
}
/**
- *
- * @param timeStamp
* @return true if still valid
- * @throws Exception
*/
boolean control(long timeStamp) throws Exception {
boolean result = false;
if ((this.lastAckTime + (this.rtt / 2)) < timeStamp) {
// send a control message
- Packet ackPacket = null;
+ Control control = new Control();
this.lock.lock();
try {
- ControlDataBean control = new ControlDataBean();
- control.setLastId(this.lastAckId);
- PacketDataBean pd = new PacketDataBean();
- pd.setResponseRequired(false);
- pd.setPayload(control.freeze().toUnframedBuffer());
- pd.setMessageType(MessageType.CONTROL_DATA);
- ackPacket = new Packet(pd.freeze());
- ackPacket.setTo(this.peerAddress);
- LOG.debug(this + " Sent Control message " + control);
+
+ control.setLastMessageSequence(this.lastAckId);
+ control.setTo(this.peerAddress);
+
+ LOG.debug(this + " Sending Control message " + control);
} finally {
this.lock.unlock();
}
- this.swp.sendDownStream(ackPacket);
+ this.swp.sendDownStream(control);
} else if (this.lastAckTime + (this.rtt * 2) < timeStamp) {
// no longer valid
LOG.debug(this + " Not valid: Last AckTime " + this.lastAckTime + " , " + this.rtt + " , " + timeStamp);
@@ -174,6 +156,10 @@ public class ProducerProcessor {
return result;
}
+ /**
+ * @return String
+ * @see java.lang.Object#toString()
+ */
public String toString() {
return "ProducerProcessor(" + this.peerAddress + ")";
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Fri Jul 19 18:44:21 2013
@@ -23,21 +23,20 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Packet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This is a sliding window protocol for unicast reliability
- *
*/
public class SwpProcessor extends DefaultChainedProcessor {
static final Log LOG = LogFactory.getLog(SwpProcessor.class);
- private Map<Buffer, SendRequest> messageRequests;
+ private Map<Packet, SendRequest> messageRequests;
private int maxConcurrentRequests = 1000;
private int maxWindowSize = 16 * 1024;
private int windowSize = 0;
@@ -71,7 +70,7 @@ public class SwpProcessor extends Defaul
public void doInit() throws Exception {
super.doInit();
- this.messageRequests = new LRUCache<Buffer, SendRequest>(this.maxConcurrentRequests);
+ this.messageRequests = new LRUCache<Packet, SendRequest>(this.maxConcurrentRequests);
}
public void doStart() throws Exception {
@@ -111,14 +110,15 @@ public class SwpProcessor extends Defaul
// Make sure we shutdown the timer before shutting down the down stream
// processors to avoid the timer getting errors.
final CountDownLatch done = new CountDownLatch(1);
- this.statusTimer.schedule(new TimerTask(){
+ this.statusTimer.schedule(new TimerTask() {
@Override
public void run() {
statusTimer.cancel();
done.countDown();
- }}, 0);
+ }
+ }, 0);
done.await();
- this.statusTimer=null;
+ this.statusTimer = null;
}
super.doStop();
}
@@ -134,15 +134,14 @@ public class SwpProcessor extends Defaul
/**
* @return the messageRequests
*/
- public Map<Buffer, SendRequest> getMessageRequests() {
+ public Map<Packet, SendRequest> getMessageRequests() {
return this.messageRequests;
}
/**
- * @param messageRequests
- * the messageRequests to set
+ * @param messageRequests the messageRequests to set
*/
- public void setMessageRequests(Map<Buffer, SendRequest> messageRequests) {
+ public void setMessageRequests(Map<Packet, SendRequest> messageRequests) {
this.messageRequests = messageRequests;
}
@@ -154,8 +153,7 @@ public class SwpProcessor extends Defaul
}
/**
- * @param maxConcurrentRequests
- * the maxConcurrentRequests to set
+ * @param maxConcurrentRequests the maxConcurrentRequests to set
*/
public void setMaxConcurrentRequests(int maxConcurrentRequests) {
this.maxConcurrentRequests = maxConcurrentRequests;
@@ -169,8 +167,7 @@ public class SwpProcessor extends Defaul
}
/**
- * @param maxWindowSize
- * the maxWindowSize to set
+ * @param maxWindowSize the maxWindowSize to set
*/
public void setMaxWindowSize(int maxWindowSize) {
this.maxWindowSize = maxWindowSize;
@@ -184,8 +181,7 @@ public class SwpProcessor extends Defaul
}
/**
- * @param windowSize
- * the windowSize to set
+ * @param windowSize the windowSize to set
*/
public void setWindowSize(int windowSize) {
this.windowSize = windowSize;
@@ -196,7 +192,7 @@ public class SwpProcessor extends Defaul
synchronized (this.producers) {
result = this.producers.get(peer);
if (result == null) {
- result = new ProducerProcessor(this, peer, this.session.incrementAndGet());
+ result = new ProducerProcessor(this, peer);
this.producers.put(peer, result);
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Fri Jul 19 18:44:21 2013
@@ -16,46 +16,40 @@
*/
package org.apache.activeblaze.impl.transport;
+import java.net.SocketAddress;
import java.net.URI;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.processor.PacketAudit;
-import org.apache.activemq.protobuf.Buffer;
+
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.BufferInputStream;
+import org.apache.activeblaze.wire.BufferOutputStream;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketAudit;
+import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Base Class for transports
- *
*/
public abstract class BaseTransport extends ThreadChainedProcessor {
private static final Log LOG = LogFactory.getLog(BaseTransport.class);
-
+ protected static final short MAGIC = 0xFAB;
static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
private URI localURI;
-
private Buffer bufferOfLocalURI;
-
private int bufferSize = DEFAULT_BUFFER_SIZE;
-
private int soTimeout = 2000;
-
private int timeToLive = 1;
-
private boolean loopBack = false;
-
protected final PacketAudit audit = new PacketAudit();
-
private boolean broadcast = true;
-
private boolean enableAudit = false;
-
private int maxDispatchQueueSize = 10000;
-
private LinkedBlockingQueue<Packet> dispatchQueue;
-
private Thread dispatchQueueThread;
+ private BufferOutputStream bufferOut = new BufferOutputStream(1024);
+ private byte[] intBuffer = new byte[4];
public void doInit() throws Exception {
super.doInit();
@@ -63,8 +57,7 @@ public abstract class BaseTransport exte
if (this.localURI != null) {
this.bufferOfLocalURI = new Buffer(this.localURI.toString());
}
- this.dispatchQueue = new LinkedBlockingQueue<Packet>(
- getMaxDispatchQueueSize());
+ this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
}
public void doShutDown() throws Exception {
@@ -82,8 +75,7 @@ public abstract class BaseTransport exte
}
}
};
- this.dispatchQueueThread = new Thread(runable, getLocalURI()
- + "-DispatchQueue");
+ this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
this.dispatchQueueThread.setDaemon(true);
this.dispatchQueueThread.start();
}
@@ -108,8 +100,7 @@ public abstract class BaseTransport exte
}
/**
- * @param localURI
- * the localURI to set
+ * @param localURI the localURI to set
*/
public void setLocalURI(URI localURI) {
this.localURI = localURI;
@@ -126,8 +117,7 @@ public abstract class BaseTransport exte
}
/**
- * @param bufferSize
- * the bufferSize to set
+ * @param bufferSize the bufferSize to set
*/
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
@@ -141,8 +131,7 @@ public abstract class BaseTransport exte
}
/**
- * @param soTimeout
- * the soTimeout to set
+ * @param soTimeout the soTimeout to set
*/
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
@@ -156,8 +145,7 @@ public abstract class BaseTransport exte
}
/**
- * @param timeToLive
- * the timeToLive to set
+ * @param timeToLive the timeToLive to set
*/
public void setTimeToLive(int timeToLive) {
this.timeToLive = timeToLive;
@@ -171,8 +159,7 @@ public abstract class BaseTransport exte
}
/**
- * @param loopBack
- * the loopBack to set
+ * @param loopBack the loopBack to set
*/
public void setLoopBack(boolean loopBack) {
this.loopBack = loopBack;
@@ -193,8 +180,7 @@ public abstract class BaseTransport exte
}
/**
- * @param broadcast
- * the broadcast to set
+ * @param broadcast the broadcast to set
*/
public void setBroadcast(boolean broadcast) {
this.broadcast = broadcast;
@@ -208,8 +194,7 @@ public abstract class BaseTransport exte
}
/**
- * @param enableAudit
- * the enableAudit to set
+ * @param enableAudit the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
@@ -222,9 +207,10 @@ public abstract class BaseTransport exte
return this.bufferOfLocalURI;
}
- public String toString() {
- return this.localURI != null ? this.localURI.toString()
- : " Uninitialized Transport";
+ public final String toString() {
+ String str = "" + System.identityHashCode(this) + ": ";
+ str += this.localURI != null ? this.localURI.toString() : " Uninitialized Transport";
+ return str;
}
/**
@@ -235,8 +221,7 @@ public abstract class BaseTransport exte
}
/**
- * @param maxDispatchQueueSize
- * the maxDispatchQueueSize to set
+ * @param maxDispatchQueueSize the maxDispatchQueueSize to set
*/
public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
this.maxDispatchQueueSize = maxDispatchQueueSize;
@@ -244,7 +229,13 @@ public abstract class BaseTransport exte
public void upStream(Packet packet) throws Exception {
if (!isStopped()) {
- this.dispatchQueue.put(packet);
+ if (!this.enableAudit || !this.audit.isDuplicate(packet)) {
+ this.dispatchQueue.put(packet);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(toString() + " Ignoring duplicate packet: " + packet);
+ }
+ }
}
}
@@ -258,7 +249,7 @@ public abstract class BaseTransport exte
} catch (InterruptedException e1) {
// we've stopped
} catch (Exception e) {
- String value="";
+ String value = "";
try {
value = packet.toString();
} catch (Throwable ignore) {
@@ -267,4 +258,55 @@ public abstract class BaseTransport exte
stopInternal();
}
}
+
+ public final synchronized void downStream(Packet packet) throws Exception {
+ if (isInitialized()) {
+ if (isEnableAudit()) {
+ // add to audit
+ this.audit.isDuplicate(packet);
+ }
+ this.bufferOut.reset();
+ this.bufferOut.writeShort(MAGIC);
+ this.bufferOut.skip(4);
+ this.bufferOut.writeByte(packet.getPacketType());
+ packet.write(this.bufferOut);
+ int len = this.bufferOut.length() - 6;
+ this.intBuffer[0] = (byte) (len >>> 24);
+ this.intBuffer[1] = (byte) (len >>> 16);
+ this.intBuffer[2] = (byte) (len >>> 8);
+ this.intBuffer[3] = (byte) (len >>> 0);
+ System.arraycopy(this.intBuffer, 0, this.bufferOut.getBuffer(), this.bufferOut.getOffset() + 2,
+ this.intBuffer.length);
+ SocketAddress to = packet.getTo();
+ sendData(to, this.bufferOut.getBuffer(), this.bufferOut.getOffset(), this.bufferOut.length());
+ }
+ }
+
+ protected final void processData(SocketAddress from, Buffer buffer) throws Exception {
+ Packet packet = buildPacket(from, buffer);
+ upStream(packet);
+ }
+
+ protected final Packet buildPacket(SocketAddress from, Buffer buffer) throws Exception {
+ Packet result = null;
+ BufferInputStream in = new BufferInputStream(buffer);
+ short magic = in.readShort();
+ if (magic == MAGIC) {
+ int len = in.readInt();
+ int type = in.readByte();
+ Packet packet = PacketType.valueOf(type).createPacket();
+ packet.read(in);
+ packet.setFrom(from);
+ result = packet;
+ } else {
+ LOG.warn("Bad Packet magic " + magic);
+ }
+ return result;
+ /*
+ * result = IOUtils.readPacket(buffer); result.setFrom(from); return
+ * result;
+ */
+ }
+
+ protected abstract void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception;
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Fri Jul 19 18:44:21 2013
@@ -16,124 +16,148 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.net.StandardSocketOptions;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.MembershipKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
+import org.apache.activeblaze.wire.Buffer;
/**
* Multicast transport
- *
*/
public class MulticastTransport extends BaseTransport {
- private MulticastSocket socket;
- private String networkInterface;
+ DatagramChannel datagramChannel;
+ ByteBuffer receiveBuffer;
+ private String networkInterfaceName;
private InetSocketAddress socketAddress;
+ private MembershipKey membershipKey;
+ private NetworkInterface networkInterface;
+ private Map<String, MembershipKey> membershipKeyMap = new ConcurrentHashMap<String, MembershipKey>();
+
public void doInit() throws Exception {
super.doInit();
- this.socket = new MulticastSocket(getLocalURI().getPort());
- this.socket.setTimeToLive(getTimeToLive());
- this.socket.setLoopbackMode(isLoopBack());
- this.socket.setSoTimeout(getSoTimeout());
- this.socket.setReceiveBufferSize(getBufferSize());
- this.socket.setSendBufferSize(getBufferSize());
this.socketAddress = new InetSocketAddress(InetAddress.getByName(getLocalURI().getHost()), getLocalURI()
.getPort());
- NetworkInterface ni = null;
- if (getNetworkInterface() != null && getNetworkInterface().length() > 0) {
- ni = NetworkInterface.getByName(getNetworkInterface());
- if (ni == null) {
- throw new BlazeException("Couldn't find an network interface named " + getNetworkInterface());
- }
- }
- if (ni != null) {
- this.socket.joinGroup(this.socketAddress, ni);
+
+ if (getNetworkInterfaceName() != null && getNetworkInterfaceName().isEmpty()) {
+ networkInterface = NetworkInterface.getByName(getNetworkInterfaceName());
} else {
- this.socket.joinGroup(this.socketAddress.getAddress());
+ networkInterface = NetworkInterface.getNetworkInterfaces().nextElement();
+ networkInterfaceName = networkInterface.getName();
+ }
+
+ if (networkInterface == null) {
+ throw new BlazeException("Couldn't find an network interface named " + getNetworkInterfaceName());
+ }
+ try {
+ receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+ datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
+ if (datagramChannel.isOpen()) {
+ datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
+ datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, getBufferSize());
+
+ datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ datagramChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface);
+
+ InetAddress group = InetAddress.getByName(getLocalURI().getHost());
+ if (!group.isMulticastAddress()) {
+ throw new BlazeException(getLocalURI().getHost() + " is not a multicast address");
+ }
+
+ datagramChannel.bind(new InetSocketAddress(getLocalURI().getPort()));
+ membershipKey = datagramChannel.join(group, networkInterface);
+ // so need to reset the local uri
+ URI oldURI = getLocalURI();
+ URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), ((InetSocketAddress) datagramChannel.getLocalAddress()).getPort(), oldURI
+ .getPath(), oldURI.getQuery(), oldURI.getFragment());
+ setLocalURI(newURI);
+ }
+ } catch (Exception e) {
+ throw new BlazeException("Could not open Datagram channel ", e);
}
}
public void doShutDown() throws Exception {
super.doShutDown();
- if (this.socket != null) {
- this.socket.close();
+
+ for (MembershipKey key : membershipKeyMap.values()) {
+ key.drop();
+ }
+ membershipKeyMap.clear();
+ if (membershipKey != null) {
+ membershipKey.drop();
+ }
+ if (this.datagramChannel != null) {
+ this.datagramChannel.close();
}
}
+
protected void doProcess() throws Exception {
- if (isInitialized()) {
- byte[] receiveData = new byte[getMaxPacketSize()];
- DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
- this.socket.receive(dp);
- if (dp.getLength() > 0) {
- PacketDataBuffer data = PacketDataBuffer.parseFramed(dp.getData());
- SocketAddress address = dp.getSocketAddress();
- Packet packet = new Packet(address, data);
- if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
- upStream(packet);
- }
- }
+ SocketAddress socketAddress = datagramChannel.receive(receiveBuffer);
+ receiveBuffer.flip();
+ if (receiveBuffer.limit() > 0) {
+ Buffer buffer = new Buffer(receiveBuffer);
+ processData(socketAddress, buffer);
}
+ receiveBuffer.clear();
}
- public synchronized void downStream(Packet packet) throws Exception {
+ public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
if (isInitialized()) {
- if (isEnableAudit()) {
- // add to audit
- this.audit.isDuplicate(packet);
- }
- byte[] data = packet.getPacketData().toFramedByteArray();
- SocketAddress to = packet.getTo();
- DatagramPacket dp = new DatagramPacket(data, data.length, to);
- this.socket.send(dp);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
+ this.datagramChannel.send(byteBuffer, to);
}
}
/**
- * @return the networkInterface
+ * @return the networkInterfaceName
*/
- public String getNetworkInterface() {
- return this.networkInterface;
+ public String getNetworkInterfaceName() {
+ return this.networkInterfaceName;
}
/**
- * @param networkInterface
- * the networkInterface to set
+ * @param networkInterfaceName the networkInterfaceName to set
*/
- public void setNetworkInterface(String networkInterface) {
- this.networkInterface = networkInterface;
+ public void setNetworkInterfaceName(String networkInterfaceName) {
+ this.networkInterfaceName = networkInterfaceName;
}
/**
* join a multicast group
- *
- * @param address
- * @throws Exception
*/
public void joinGroup(String address) throws Exception {
if (isInitialized()) {
InetAddress group = InetAddress.getByName(address);
- this.socket.joinGroup(group);
+ if (!group.isMulticastAddress()) {
+ throw new BlazeException(address + " is not a multicast address");
+ }
+ MembershipKey key = datagramChannel.join(group, networkInterface);
+ membershipKeyMap.put(address, key);
}
}
/**
* leave a multicast group
- *
- * @param address
- * @throws Exception
*/
public void leaveGroup(String address) throws Exception {
if (isInitialized()) {
- InetAddress group = InetAddress.getByName(address);
- this.socket.leaveGroup(group);
+ MembershipKey key = membershipKeyMap.remove(address);
+ if (key != null) {
+ key.drop();
+ }
}
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
package org.apache.activeblaze.impl.transport;
import java.net.SocketTimeoutException;
+
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Thread associated with processing
- *
*/
public abstract class ThreadChainedProcessor extends DefaultChainedProcessor implements Runnable {
private static final Log LOG = LogFactory.getLog(ThreadChainedProcessor.class);
@@ -74,8 +74,6 @@ public abstract class ThreadChainedProce
/**
* Process input for the Processor
- *
- * @throws Exception
*/
protected abstract void doProcess() throws Exception;
@@ -87,8 +85,7 @@ public abstract class ThreadChainedProce
}
/**
- * @param priority
- * the priority to set
+ * @param priority the priority to set
*/
public void setPriority(int priority) {
this.priority = priority;
@@ -102,8 +99,7 @@ public abstract class ThreadChainedProce
}
/**
- * @param daemon
- * the daemon to set
+ * @param daemon the daemon to set
*/
public void setDaemon(boolean daemon) {
this.daemon = daemon;
@@ -117,8 +113,7 @@ public abstract class ThreadChainedProce
}
/**
- * @param name
- * the name to set
+ * @param name the name to set
*/
public void setName(String name) {
this.name = name;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java Fri Jul 19 18:44:21 2013
@@ -18,41 +18,39 @@ package org.apache.activeblaze.impl.tran
import java.net.URI;
import java.util.Map;
+
import org.apache.activeblaze.util.ObjectFinder;
import org.apache.activeblaze.util.PropertyUtil;
/**
* Find a Transport from a URI scheme
- *
*/
public abstract class TransportFactory {
private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/transport/");
/**
- * @param location
* @return the configured transport from its URI
- * @throws Exception
*/
public static BaseTransport get(URI location) throws Exception {
- BaseTransport result = findTransport(location);
+ BaseTransport result = findTransport(location);
result.setLocalURI(location);
configureTransport(result, location);
return result;
}
-
+
static void configureTransport(BaseTransport transport, URI uri) throws Exception {
Map<String, String> options = PropertyUtil.parseParameters(uri);
PropertyUtil.setProperties(transport, options);
}
-
+
private static BaseTransport findTransport(URI location) throws Exception {
- String scheme = location.getScheme();
- if (scheme == null) {
- throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
- }
- BaseTransport result = (BaseTransport) OBJECT_FINDER.newInstance(scheme);
- return result;
+ String scheme = location.getScheme();
+ if (scheme == null) {
+ throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
+ }
+ BaseTransport result = (BaseTransport) OBJECT_FINDER.newInstance(scheme);
+ return result;
}
}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Fri Jul 19 18:44:21 2013
@@ -16,161 +16,99 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.net.StandardSocketOptions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Map;
import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeNoRouteException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.util.IOUtils;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.Packet;
/**
* UdpTransport
- *
*/
public class UdpTransport extends BaseTransport {
- private DatagramChannel channel;
+ DatagramChannel datagramChannel;
+ ByteBuffer receiveBuffer;
+ private Map<Packet, SendRequest> messageRequests = new LRUCache<Packet, SendRequest>(5000);
- private ByteBuffer inBuffer;
-
- private ByteBuffer outBuffer;
-
- private Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
- 1000);
public void doInit() throws Exception {
super.doInit();
- this.channel = DatagramChannel.open();
- DatagramSocket socket = this.channel.socket();
- SocketAddress address = null;
+ InetSocketAddress address = null;
if (getLocalURI() != null) {
- address = new InetSocketAddress(getLocalURI().getHost(),
- getLocalURI().getPort());
+ address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
} else {
throw new BlazeException("localURI not set");
}
- socket.setBroadcast(isBroadcast());
- socket.setReceiveBufferSize(getBufferSize());
- socket.setSendBufferSize(getBufferSize());
- socket.setSoTimeout(getSoTimeout());
- this.channel.configureBlocking(true);
- socket.bind(address);
- // if the port was 0 - the port will be allocated automatically -
- // so need to reset the local uri
- URI oldURI = getLocalURI();
- URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI
- .getHost(), socket.getLocalPort(), oldURI.getPath(), oldURI
- .getQuery(), oldURI.getFragment());
- setLocalURI(newURI);
- this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
- this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+ try {
+ receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+ datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
+ if (datagramChannel.isOpen()) {
+ datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
+ datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, getBufferSize());
+ datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ datagramChannel.bind(address);
+ // so need to reset the local uri
+ URI oldURI = getLocalURI();
+ URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), ((InetSocketAddress) datagramChannel.getLocalAddress()).getPort(), oldURI
+ .getPath(), oldURI.getQuery(), oldURI.getFragment());
+ setLocalURI(newURI);
+ }
+ } catch (Exception e) {
+ throw new BlazeException("Could not open Datagram channel for " + getLocalURI(), e);
+ }
}
public void doShutDown() throws Exception {
super.doShutDown();
- if (this.channel != null) {
- this.channel.close();
- this.inBuffer = null;
- this.outBuffer = null;
- this.channel = null;
+ if (this.datagramChannel != null) {
+ this.datagramChannel.close();
}
}
protected void doProcess() throws Exception {
- this.inBuffer.clear();
- SocketAddress address = this.channel.receive(this.inBuffer);
- ByteBuffer buffer = this.inBuffer;
- if (isInitialized()) {
- buffer.flip();
- while (buffer.remaining() > 0) {
- InputStream stream = IOUtils.getByteBufferInputStream(buffer);
- PacketDataBuffer data = PacketDataBuffer.parseFramed(stream);
- stream.close();
- if (data.getResponse()) {
- synchronized (this.messageRequests) {
- SendRequest<PacketDataBuffer> request = this.messageRequests.remove(data.getCorrelationId());
- if (request != null) {
- request.put(data.getMessageId(), data);
- }
- }
- }
- if (data.getResponseRequired()) {
- Packet packet = createAckPacket(data);
- packet.setTo(address);
- downStream(packet);
- }
- Packet packet = new Packet(address, data);
- if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
- upStream(packet);
+ SocketAddress socketAddress = datagramChannel.receive(receiveBuffer);
+ receiveBuffer.flip();
+ if (receiveBuffer.limit() > 0) {
+ Buffer buffer = new Buffer(receiveBuffer);
+ Packet packet = buildPacket(socketAddress, buffer);
+ if (packet.isResponse()) {
+ SendRequest request = this.messageRequests.remove(packet.getCorrelationId());
+ if (request != null) {
+ request.put(packet.getCorrelationId(), packet);
}
}
- buffer.clear();
+ if (packet.isResponseRequired()) {
+ Packet ack = createAckPacket(packet);
+ ack.setTo(socketAddress);
+ downStream(ack);
+ }
+ upStream(packet);
}
+ receiveBuffer.clear();
}
- public void downStream(Packet packet) throws Exception {
- ByteBuffer buffer = this.outBuffer;
- if (isStarted()) {
- SendRequest<PacketDataBuffer> request = null;
- if (packet.isResponseRequired()) {
- synchronized (this.messageRequests) {
- request = new SendRequest<PacketDataBuffer>();
- this.messageRequests.put(packet.getPacketData().getMessageId(), request);
- }
- }
- synchronized (buffer) {
- buffer.clear();
- OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
- if (isEnableAudit()) {
- // add to audit
- this.audit.isDuplicate(packet);
- }
- packet.getPacketData().writeFramed(stream);
- stream.close();
- buffer.flip();
- this.channel.send(buffer, packet.getTo());
- }
- if (request != null) {
- if (request.get(getSoTimeout()) == null) {
- throw new BlazeNoRouteException("No response in "
- + getSoTimeout() + " ms from " + packet.getTo());
- }
- }
- } else {
- if (!shutDown()) {
- throw new BlazeException(this + " Not started - cannot send "
- + packet);
- }
+ public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
+ if (isInitialized()) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
+ this.datagramChannel.send(byteBuffer, to);
}
}
- private Packet createAckPacket(PacketData data) {
- AckDataBean ackData = new AckDataBean();
- ackData.setSessionId(data.getSessionId());
- ackData.setStartSequence(data.getMessageSequence());
- ackData.setEndSequence(data.getMessageSequence());
- PacketDataBean pd = new PacketDataBean();
- pd.setResponseRequired(false);
- pd.setCorrelationId(data.getMessageId());
- pd.setResponse(true);
- pd.setPayload(ackData.freeze().toUnframedBuffer());
- pd.setMessageType(MessageType.ACK_DATA);
- Packet packet = new Packet(pd.freeze());
- return packet;
+ private Packet createAckPacket(Packet packet) {
+ Ack ack = new Ack();
+ ack.setStartSequence(packet.getMessageSequence());
+ ack.setEndSequence(packet.getMessageSequence());
+ ack.setCorrelationId(packet.getId());
+ return ack;
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<html>
<head>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Jul 19 18:44:21 2013
@@ -18,39 +18,19 @@ package org.apache.activeblaze.jms;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
+
+import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
import org.apache.activeblaze.BlazeMessageListener;
-import org.apache.activeblaze.BlazeMessageProcessor;
import org.apache.activeblaze.Subscription;
import org.apache.activeblaze.group.BlazeGroupChannel;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+
/**
* Implementation of a JMS Connection
- *
*/
public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
- org.apache.activeblaze.ExceptionListener, BlazeMessageProcessor {
+ org.apache.activeblaze.ExceptionListener {
protected final BlazeGroupChannel channel;
protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
private String clientId;
@@ -64,11 +44,9 @@ public class BlazeJmsConnection implemen
this.channel = channel;
this.channel.setExceptionListener(this);
this.clientId = channel.getName();
- this.channel.setBlazeMessageProcessor(this);
}
/**
- * @throws JMSException
* @see javax.jms.Connection#close()
*/
public void close() throws JMSException {
@@ -85,44 +63,30 @@ public class BlazeJmsConnection implemen
}
/**
- * @param destination
- * @param messageSelector
- * @param sessionPool
- * @param maxMessages
* @return ConnectionConsumer
- * @throws JMSException
* @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination,
* java.lang.String, javax.jms.ServerSessionPool, int)
*/
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
return null;
}
/**
- * @param topic
- * @param subscriptionName
- * @param messageSelector
- * @param sessionPool
- * @param maxMessages
* @return ConnectionConsumer
- * @throws JMSException
* @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic,
* java.lang.String, java.lang.String, javax.jms.ServerSessionPool,
* int)
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
return null;
}
/**
- * @param transacted
- * @param acknowledgeMode
* @return Session
- * @throws JMSException
* @see javax.jms.Connection#createSession(boolean, int)
*/
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -158,8 +122,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @param clientID
- * @throws JMSException
* @see javax.jms.Connection#setClientID(java.lang.String)
*/
public void setClientID(String clientID) throws JMSException {
@@ -175,7 +137,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @param listener
* @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
*/
public void setExceptionListener(ExceptionListener listener) {
@@ -183,7 +144,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @throws JMSException
* @see javax.jms.Connection#start()
*/
public void start() throws JMSException {
@@ -196,7 +156,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @throws JMSException
* @see javax.jms.Connection#stop()
*/
public void stop() throws JMSException {
@@ -209,26 +168,18 @@ public class BlazeJmsConnection implemen
}
/**
- * @param topic
- * @param messageSelector
- * @param sessionPool
- * @param maxMessages
* @return ConnectionConsumer
- * @throws JMSException
* @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic,
* java.lang.String, javax.jms.ServerSessionPool, int)
*/
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
return null;
}
/**
- * @param transacted
- * @param acknowledgeMode
* @return TopicSession
- * @throws JMSException
* @see javax.jms.TopicConnection#createTopicSession(boolean, int)
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -240,26 +191,18 @@ public class BlazeJmsConnection implemen
}
/**
- * @param queue
- * @param messageSelector
- * @param sessionPool
- * @param maxMessages
* @return ConnectionConsumer
- * @throws JMSException
* @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue,
* java.lang.String, javax.jms.ServerSessionPool, int)
*/
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
return null;
}
/**
- * @param transacted
- * @param acknowledgeMode
* @return QueueSession
- * @throws JMSException
* @see javax.jms.QueueConnection#createQueueSession(boolean, int)
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -271,7 +214,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @param ex
* @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
*/
public void onException(Exception ex) {
@@ -279,7 +221,6 @@ public class BlazeJmsConnection implemen
}
/**
- * @param ex
* @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
*/
public void onException(JMSException ex) {
@@ -347,7 +288,7 @@ public class BlazeJmsConnection implemen
/**
* Get the consumerMaxDispatchQueueDepth
- *
+ *
* @return the consumerMaxDispatchQueueDepth
*/
public int getConsumerMaxDispatchQueueDepth() {
@@ -356,43 +297,10 @@ public class BlazeJmsConnection implemen
/**
* Set the consumerMaxDispatchQueueDepth
- *
- * @param consumerMaxDispatchQueueDepth
- * the consumerMaxDispatchQueueDepth to set
+ *
+ * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
*/
public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
}
-
- /**
- * @param data
- * @return a BlazeMessage
- * @throws Exception
- *
- */
- public BlazeJmsMessage processBlazeMessage(PacketData data) throws Exception {
- BlazeJmsMessage result = null;
- if (data != null) {
- DestinationData destination = data.getDestinationData();
- Buffer payload = data.getPayload();
- BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
- String fromId = null;
- if (data.hasProducerId()) {
- fromId = data.getProducerId().toStringUtf8();
- }
- result = BlazeJmsMessageTransformation.createMessage(data.getPayloadType());
- result.setDestination(destination);
- result.setFromId(fromId);
- if (data.hasMessageId()) {
- result.setMessageId(data.getMessageId().toStringUtf8());
- }
- if (data.hasCorrelationId()) {
- result.setCorrelationId(data.getCorrelationId().toStringUtf8());
- }
- result.setTimeStamp(blazeData.getTimestamp());
- result.setType(data.getPayloadType());
- result.setContent(blazeData);
- }
- return result;
- }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java Fri Jul 19 18:44:21 2013
@@ -16,15 +16,10 @@
*/
package org.apache.activeblaze.jms;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.group.BlazeGroupChannelFactory;
-import org.apache.activeblaze.group.BlazeGroupConfiguration;
-import org.apache.activeblaze.jndi.JNDIStorable;
-import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.util.PropertyUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -32,17 +27,22 @@ import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+import org.apache.activeblaze.jndi.JNDIStorable;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
/**
* Jms ConnectionFactory implementation
- *
*/
public class BlazeJmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory,
-TopicConnectionFactory {
+ TopicConnectionFactory {
private static final IdGenerator NAME_GENERATOR = new IdGenerator();
private final BlazeGroupChannelFactory groupChannelFactory;
private final Map<String, String> props = new HashMap<String, String>();
- private int consumerMaxDispatchQueueDepth=10000;
+ private int consumerMaxDispatchQueueDepth = 10000;
/**
* Constructor
@@ -53,8 +53,6 @@ TopicConnectionFactory {
/**
* Constructor
- *
- * @param config
*/
public BlazeJmsConnectionFactory(BlazeGroupConfiguration config) {
this.groupChannelFactory = new BlazeGroupChannelFactory(config);
@@ -69,11 +67,10 @@ TopicConnectionFactory {
/**
* Set properties
- * @param props
*/
public void setProperties(Properties props) {
- Map<String,String> map = new HashMap<String, String>();
- for (Map.Entry<Object,Object> entry: props.entrySet()) {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
map.put(entry.getKey().toString(), entry.getValue().toString());
}
setProperties(map);
@@ -85,7 +82,6 @@ TopicConnectionFactory {
}
/**
- * @param props
* @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(Map<String, String> map)
*/
@Override
@@ -95,8 +91,6 @@ TopicConnectionFactory {
}
/**
- *
- * @param map
* @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(Map<String, String> map)
*/
@Override
@@ -114,7 +108,6 @@ TopicConnectionFactory {
/**
* @return a TopicConnection
- * @throws JMSException
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
public TopicConnection createTopicConnection() throws JMSException {
@@ -129,10 +122,7 @@ TopicConnectionFactory {
}
/**
- * @param userName
- * @param password
* @return a TopicConnection
- * @throws JMSException
* @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String, java.lang.String)
*/
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
@@ -148,7 +138,6 @@ TopicConnectionFactory {
/**
* @return a Connection
- * @throws JMSException
* @see javax.jms.ConnectionFactory#createConnection()
*/
public Connection createConnection() throws JMSException {
@@ -163,10 +152,7 @@ TopicConnectionFactory {
}
/**
- * @param userName
- * @param password
* @return Connection
- * @throws JMSException
* @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
*/
public Connection createConnection(String userName, String password) throws JMSException {
@@ -182,7 +168,6 @@ TopicConnectionFactory {
/**
* @return a QueueConnection
- * @throws JMSException
* @see javax.jms.QueueConnectionFactory#createQueueConnection()
*/
public QueueConnection createQueueConnection() throws JMSException {
@@ -197,10 +182,7 @@ TopicConnectionFactory {
}
/**
- * @param userName
- * @param password
* @return a QueueConnection
- * @throws JMSException
* @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String, java.lang.String)
*/
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
@@ -216,6 +198,7 @@ TopicConnectionFactory {
/**
* Get the consumerMaxDispatchQueueDepth
+ *
* @return the consumerMaxDispatchQueueDepth
*/
public int getConsumerMaxDispatchQueueDepth() {
@@ -224,6 +207,7 @@ TopicConnectionFactory {
/**
* Set the consumerMaxDispatchQueueDepth
+ *
* @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
*/
public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {