You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/25 15:27:24 UTC
svn commit: r720505 [2/2] - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/
main/java/org/apache/activeblaze/coordinated/
main/java/org/apache/activeblaze/group/
main/java/org/apache/activeblaze/impl/processor/ main/java/org/ap...
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Nov 25 06:27:23 2008
@@ -24,10 +24,17 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
+import java.util.Map;
import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.util.LRUCache;
+import org.apache.activeblaze.util.SendRequest;
+import org.apache.activeblaze.wire.AckData;
+import org.apache.activeblaze.wire.MessageType;
import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
/**
* UdpTransport
@@ -37,6 +44,7 @@
private DatagramChannel channel;
private ByteBuffer inBuffer;
private ByteBuffer outBuffer;
+ private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
public boolean init() throws Exception {
boolean result = super.init();
@@ -90,9 +98,35 @@
InputStream stream = IOUtils.getByteBufferInputStream(buffer);
PacketData data = PacketData.parseFramed(stream);
stream.close();
- Packet packet = new Packet(address, data);
- if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
- upStream(packet);
+ if (MessageType.ACK_DATA.getNumber() == data.getType()) {
+ synchronized (this.messageRequests) {
+ SendRequest request = this.messageRequests.remove(data.getCorrelationId());
+ if (request != null) {
+ MessageType type = MessageType.ACK_DATA;
+ AckData ack = (AckData) type.createMessage();
+ ack.mergeFramed(data.getPayload());
+ request.put(data.getMessageId(), ack);
+ }
+ }
+ } else {
+ if (data.getResponseRequired()) {
+ MessageType type = MessageType.ACK_DATA;
+ AckData ack = (AckData) type.createMessage();
+ ack.setMessageId(data.getMessageId());
+ PacketData pd = new PacketData();
+ pd.setResponseRequired(false);
+ pd.setCorrelationId(data.getMessageId());
+ pd.setType(type.getNumber());
+ pd.setFromAddress(getBufferOfLocalURI());
+ pd.setPayload(ack.toFramedBuffer());
+ Packet packet = new Packet(pd);
+ packet.setTo(address);
+ downStream(packet);
+ }
+ Packet packet = new Packet(address, data);
+ if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
+ upStream(packet);
+ }
}
}
buffer.clear();
@@ -102,6 +136,13 @@
public void downStream(Packet packet) throws Exception {
ByteBuffer buffer = this.outBuffer;
if (isStarted()) {
+ SendRequest request = null;
+ if (packet.getPacketData().getResponseRequired()) {
+ synchronized (this.messageRequests) {
+ request = new SendRequest();
+ this.messageRequests.put(packet.getPacketData().getMessageId(), request);
+ }
+ }
buffer.clear();
OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
if (isEnableAudit()) {
@@ -112,8 +153,13 @@
stream.close();
buffer.flip();
this.channel.send(buffer, packet.getTo());
+ if (request != null) {
+ if (request.get(0) == null) {
+ throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo());
+ }
+ }
} else {
- throw new BlazeException("Not started");
+ throw new BlazeException("Not started - trying to downStream " + packet);
}
}
}
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
import java.util.HashSet;
import java.util.Set;
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+ private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+ /** <p>Maps primitive type names to corresponding class objects.</p> */
+ private static final HashMap<String, Class> primClasses = new HashMap<String, Class>(8, 1.0F);
+ public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ return load(classDesc.getName(), cl);
+ }
+
+ protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Class[] cinterfaces = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ cinterfaces[i] = load(interfaces[i], cl);
+ }
+
+ try {
+ return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
+ } catch (IllegalArgumentException e) {
+ throw new ClassNotFoundException(null, e);
+ }
+ }
+
+ private Class load(String className, ClassLoader cl)
+ throws ClassNotFoundException {
+ try {
+ return Class.forName(className, false, cl);
+ } catch (ClassNotFoundException e) {
+ final Class clazz = (Class) primClasses.get(className);
+ if (clazz != null) {
+ return clazz;
+ } else {
+ return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+ }
+ }
+ }
+
+
+
+ static {
+ primClasses.put("boolean", boolean.class);
+ primClasses.put("byte", byte.class);
+ primClasses.put("char", char.class);
+ primClasses.put("short", short.class);
+ primClasses.put("int", int.class);
+ primClasses.put("long", long.class);
+ primClasses.put("float", float.class);
+ primClasses.put("double", double.class);
+ primClasses.put("void", void.class);
+ }
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A Simple LRU Cache
+ *
+ * @version $Revision$
+ * @param <K>
+ * @param <V>
+ */
+
+public class LRUCache<K, V> extends LinkedHashMap<K, V> {
+ private static final long serialVersionUID = -342098639681884413L;
+ protected int maxCacheSize = 10000;
+
+ /**
+ * Default constructor for an LRU Cache The default capacity is 10000
+ */
+ public LRUCache() {
+ this(0,10000, 0.75f, true);
+ }
+
+ /**
+ * Constructs a LRUCache with a maximum capacity
+ *
+ * @param maximumCacheSize
+ */
+ public LRUCache(int maximumCacheSize) {
+ this(0, maximumCacheSize, 0.75f, true);
+ }
+
+ /**
+ * Constructs an empty <tt>LRUCache</tt> instance with the specified
+ * initial capacity, maximumCacheSize,load factor and ordering mode.
+ *
+ * @param initialCapacity the initial capacity.
+ * @param maximumCacheSize
+ * @param loadFactor the load factor.
+ * @param accessOrder the ordering mode - <tt>true</tt> for access-order,
+ * <tt>false</tt> for insertion-order.
+ * @throws IllegalArgumentException if the initial capacity is negative or
+ * the load factor is non-positive.
+ */
+
+ public LRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) {
+ super(initialCapacity, loadFactor, accessOrder);
+ this.maxCacheSize = maximumCacheSize;
+ }
+
+ /**
+ * @return Returns the maxCacheSize.
+ */
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+
+ /**
+ * @param maxCacheSize The maxCacheSize to set.
+ */
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ }
+
+ protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
+ return size() > maxCacheSize;
+ }
+}
+
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
import org.apache.activemq.protobuf.Buffer;
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.protobuf.Buffer;
@@ -23,16 +23,16 @@
import org.apache.commons.logging.LogFactory;
/**
- * @author state on a request
+ * state on a request
*
*/
-class SendRequest {
+public class SendRequest {
private static final Log LOG = LogFactory.getLog(SendRequest.class);
private final AtomicBoolean done = new AtomicBoolean();
private Message<?> response;
private RequestCallback callback;
- Object get(long timeout) {
+ public Object get(long timeout) {
synchronized (this.done) {
if (this.done.get() == false && this.response == null) {
try {
@@ -45,7 +45,7 @@
return this.response;
}
- void put(Buffer id,Message<?> response) {
+ public void put(Buffer id,Message<?> response) {
this.response = response;
cancel();
RequestCallback callback = this.callback;
Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Nov 25 06:27:23 2008
@@ -30,21 +30,33 @@
BLAZE_DATA = 0;
MEMBER_DATA = 1;
ELECTION_MESSAGE = 2;
+ ACK_DATA = 3;
}
message PacketData {
- optional int32 type =1;
- optional bytes producerId = 2;
- optional bytes fromAddress =3;
- optional int64 sessionId = 4;
- optional int64 messageSequence = 5;
- optional bool reliable = 6;
- optional int32 numberOfParts= 7;
- optional int32 partNumber= 8;
- optional bytes payload= 9;
- optional bytes messageId =10;
- optional bytes correlationId = 11;
+ optional bool responseRequired = 1;
+ optional int32 type =2;
+ optional bytes producerId = 3;
+ optional bytes fromAddress =4;
+ optional int64 sessionId = 5;
+ optional int64 messageSequence = 6;
+ optional bool reliable = 7;
+ optional int32 numberOfParts= 8;
+ optional int32 partNumber= 9;
+ optional bytes payload= 10;
+ optional bytes messageId =11;
+ optional bytes correlationId = 12;
}
+ message AckData {
+ //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+ //| option java_type_method = "MessageType";
+ optional bytes messageId = 1;
+ optional int64 messageSequence =2;
+ optional bytes fromAddress =3;
+ optional int64 sessionId = 4;
+ optional int64 messageSequence = 5;
+ }
+
message DestinationData {
required bool topic = 1;
required bytes destination = 2;
@@ -127,6 +139,11 @@
optional bytes value = 2;
}
+ message BufferType {
+ optional string name = 1;
+ optional bytes value = 2;
+ }
+
message MapData {
@@ -142,6 +159,7 @@
repeated CharType charType = 10;
repeated BytesType bytesType = 11;
repeated MapData mapType = 12;
+ repeated BufferType bufferType = 13;
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Tue Nov 25 06:27:23 2008
@@ -29,13 +29,12 @@
*/
public class BlazeChannelTest extends TestCase {
public void testChannel() throws Exception {
- int count = 100;
+ int count = 10000;
final AtomicInteger received = new AtomicInteger();
String destination = "test.foo";
BlazeChannelFactory factory = new BlazeChannelFactory();
BlazeChannel sender = factory.createChannel();
BlazeChannel receiver = factory.createChannel();
- receiver.getConfiguration().setUseDispatchThread(true);
sender.start();
receiver.start();
final CountDownLatch latch = new CountDownLatch(count);
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Nov 25 06:27:23 2008
@@ -19,13 +19,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.JMSException;
import junit.framework.TestCase;
import org.apache.activeblaze.BlazeChannel;
import org.apache.activeblaze.BlazeMessage;
/**
- * @author rajdavies
+ * Test BlazeGroupChannel
*
*/
public class BlazeGroupChannelTest extends TestCase {
Added: activemq/activemq-blaze/trunk/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/resources/log4j.properties?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/resources/log4j.properties (added)
+++ activemq/activemq-blaze/trunk/src/test/resources/log4j.properties Tue Nov 25 06:27:23 2008
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file