You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/25 15:27:24 UTC

svn commit: r720505 [2/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/coordinated/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/ap...

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Nov 25 06:27:23 2008
@@ -24,10 +24,17 @@
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
+import java.util.Map;
 import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeNoRouteException;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.util.LRUCache;
+import org.apache.activeblaze.util.SendRequest;
+import org.apache.activeblaze.wire.AckData;
+import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
 
 /**
  * UdpTransport
@@ -37,6 +44,7 @@
     private DatagramChannel channel;
     private ByteBuffer inBuffer;
     private ByteBuffer outBuffer;
+    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
 
     public boolean init() throws Exception {
         boolean result = super.init();
@@ -90,9 +98,35 @@
                 InputStream stream = IOUtils.getByteBufferInputStream(buffer);
                 PacketData data = PacketData.parseFramed(stream);
                 stream.close();
-                Packet packet = new Packet(address, data);
-                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
-                    upStream(packet);
+                if (MessageType.ACK_DATA.getNumber() == data.getType()) {
+                    synchronized (this.messageRequests) {
+                        SendRequest request = this.messageRequests.remove(data.getCorrelationId());
+                        if (request != null) {
+                            MessageType type = MessageType.ACK_DATA;
+                            AckData ack = (AckData) type.createMessage();
+                            ack.mergeFramed(data.getPayload());
+                            request.put(data.getMessageId(), ack);
+                        }
+                    }
+                } else {
+                    if (data.getResponseRequired()) {
+                        MessageType type = MessageType.ACK_DATA;
+                        AckData ack = (AckData) type.createMessage();
+                        ack.setMessageId(data.getMessageId());
+                        PacketData pd = new PacketData();
+                        pd.setResponseRequired(false);
+                        pd.setCorrelationId(data.getMessageId());
+                        pd.setType(type.getNumber());
+                        pd.setFromAddress(getBufferOfLocalURI());
+                        pd.setPayload(ack.toFramedBuffer());
+                        Packet packet = new Packet(pd);
+                        packet.setTo(address);
+                        downStream(packet);
+                    }
+                    Packet packet = new Packet(address, data);
+                    if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
+                        upStream(packet);
+                    }
                 }
             }
             buffer.clear();
@@ -102,6 +136,13 @@
     public void downStream(Packet packet) throws Exception {
         ByteBuffer buffer = this.outBuffer;
         if (isStarted()) {
+            SendRequest request = null;
+            if (packet.getPacketData().getResponseRequired()) {
+                synchronized (this.messageRequests) {
+                    request = new SendRequest();
+                    this.messageRequests.put(packet.getPacketData().getMessageId(), request);
+                }
+            }
             buffer.clear();
             OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
             if (isEnableAudit()) {
@@ -112,8 +153,13 @@
             stream.close();
             buffer.flip();
             this.channel.send(buffer, packet.getTo());
+            if (request != null) {
+                if (request.get(0) == null) {
+                    throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo());
+                }
+            }
         } else {
-            throw new BlazeException("Not started");
+            throw new BlazeException("Not started - trying to downStream " + packet);
         }
     }
 }

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
 
 import java.util.HashSet;
 import java.util.Set;

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+    private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+    /** <p>Maps primitive type names to corresponding class objects.</p> */
+    private static final HashMap<String, Class> primClasses = new HashMap<String, Class>(8, 1.0F);
+    public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+        super(in);
+    }
+
+    protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        return load(classDesc.getName(), cl);
+    }
+
+    protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Class[] cinterfaces = new Class[interfaces.length];
+        for (int i = 0; i < interfaces.length; i++) {
+            cinterfaces[i] = load(interfaces[i], cl);
+        }
+
+        try {
+            return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
+        } catch (IllegalArgumentException e) {
+            throw new ClassNotFoundException(null, e);
+        }
+    }
+
+    private Class load(String className, ClassLoader cl)
+            throws ClassNotFoundException {
+        try {
+            return Class.forName(className, false, cl);
+        } catch (ClassNotFoundException e) {
+            final Class clazz = (Class) primClasses.get(className);
+            if (clazz != null) {
+                return clazz;
+            } else {
+                return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+            }
+        }
+    }
+    
+    
+    
+    static {
+        primClasses.put("boolean", boolean.class);
+        primClasses.put("byte", byte.class);
+        primClasses.put("char", char.class);
+        primClasses.put("short", short.class);
+        primClasses.put("int", int.class);
+        primClasses.put("long", long.class);
+        primClasses.put("float", float.class);
+        primClasses.put("double", double.class);
+        primClasses.put("void", void.class);
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A Simple LRU Cache
+ * 
+ * @version $Revision$
+ * @param <K>
+ * @param <V>
+ */
+
+public class LRUCache<K, V> extends LinkedHashMap<K, V> {
+    private static final long serialVersionUID = -342098639681884413L;
+    protected int maxCacheSize = 10000;
+
+    /**
+     * Default constructor for an LRU Cache The default capacity is 10000
+     */
+    public LRUCache() {
+        this(0,10000, 0.75f, true);
+    }
+
+    /**
+     * Constructs a LRUCache with a maximum capacity
+     * 
+     * @param maximumCacheSize
+     */
+    public LRUCache(int maximumCacheSize) {
+        this(0, maximumCacheSize, 0.75f, true);
+    }
+
+    /**
+     * Constructs an empty <tt>LRUCache</tt> instance with the specified
+     * initial capacity, maximumCacheSize,load factor and ordering mode.
+     * 
+     * @param initialCapacity the initial capacity.
+     * @param maximumCacheSize
+     * @param loadFactor the load factor.
+     * @param accessOrder the ordering mode - <tt>true</tt> for access-order,
+     *                <tt>false</tt> for insertion-order.
+     * @throws IllegalArgumentException if the initial capacity is negative or
+     *                 the load factor is non-positive.
+     */
+
+    public LRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) {
+        super(initialCapacity, loadFactor, accessOrder);
+        this.maxCacheSize = maximumCacheSize;
+    }
+
+    /**
+     * @return Returns the maxCacheSize.
+     */
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    /**
+     * @param maxCacheSize The maxCacheSize to set.
+     */
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+
+    protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
+        return size() > maxCacheSize;
+    }
+}
+

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
 
 import org.apache.activemq.protobuf.Buffer;
 

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java&r1=719718&r2=720505&rev=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Nov 25 06:27:23 2008
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.util;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.protobuf.Buffer;
@@ -23,16 +23,16 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * @author state on a request
+ * state on a request
  *
  */
-class SendRequest {
+public class SendRequest {
     private static final Log LOG = LogFactory.getLog(SendRequest.class);
     private final AtomicBoolean done = new AtomicBoolean();
     private Message<?> response;
     private RequestCallback callback;
 
-    Object get(long timeout) {
+    public Object get(long timeout) {
         synchronized (this.done) {
             if (this.done.get() == false && this.response == null) {
                 try {
@@ -45,7 +45,7 @@
         return this.response;
     }
 
-    void put(Buffer id,Message<?> response) {
+    public void put(Buffer id,Message<?> response) {
         this.response = response;
         cancel();
         RequestCallback callback = this.callback;

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Nov 25 06:27:23 2008
@@ -30,21 +30,33 @@
   BLAZE_DATA = 0;
   MEMBER_DATA = 1;
   ELECTION_MESSAGE = 2;
+  ACK_DATA = 3;
 }
     message PacketData {   
-      optional int32 type =1;  
-	    optional bytes producerId = 2;
-	    optional bytes fromAddress =3;
-	    optional int64 sessionId = 4;
-      optional int64 messageSequence = 5;
-      optional bool reliable = 6;
-      optional int32 numberOfParts= 7;
-      optional int32 partNumber= 8;
-      optional bytes payload= 9;
-      optional bytes messageId =10;
-      optional bytes correlationId = 11;
+      optional bool responseRequired = 1;
+      optional int32 type =2;  
+	    optional bytes producerId = 3;
+	    optional bytes fromAddress =4;
+	    optional int64 sessionId = 5;
+      optional int64 messageSequence = 6;
+      optional bool reliable = 7;
+      optional int32 numberOfParts= 8;
+      optional int32 partNumber= 9;
+      optional bytes payload= 10;
+      optional bytes messageId =11;
+      optional bytes correlationId = 12;
 	  
     }
+    message AckData {
+     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       optional bytes messageId = 1;
+       optional int64 messageSequence =2;
+       optional bytes fromAddress =3;
+       optional int64 sessionId = 4;
+       optional int64 messageSequence = 5;
+    }
+    
     message DestinationData {
       required bool topic = 1;
       required bytes destination = 2;
@@ -127,6 +139,11 @@
 	  optional bytes value = 2;
 	}
 	
+	message BufferType {
+	  optional string name = 1;
+	  optional bytes  value = 2;
+	}
+	
 	
     
     message MapData {
@@ -142,6 +159,7 @@
  	    repeated CharType charType = 10;
 	    repeated BytesType  bytesType = 11;
       repeated MapData  mapType = 12;
+      repeated BufferType  bufferType = 13;
       
     }
     

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Tue Nov 25 06:27:23 2008
@@ -29,13 +29,12 @@
  */
 public class BlazeChannelTest extends TestCase {
     public void testChannel() throws Exception {
-        int count = 100;
+        int count = 10000;
         final AtomicInteger received = new AtomicInteger();
         String destination = "test.foo";
         BlazeChannelFactory factory = new BlazeChannelFactory();
         BlazeChannel sender = factory.createChannel();
         BlazeChannel receiver = factory.createChannel();
-        receiver.getConfiguration().setUseDispatchThread(true);
         sender.start();
         receiver.start();
         final CountDownLatch latch = new CountDownLatch(count);

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Nov 25 06:27:23 2008
@@ -19,13 +19,12 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.JMSException;
 import junit.framework.TestCase;
 import org.apache.activeblaze.BlazeChannel;
 import org.apache.activeblaze.BlazeMessage;
 
 /**
- * @author rajdavies
+ * Test BlazeGroupChannel
  * 
  */
 public class BlazeGroupChannelTest extends TestCase {

Added: activemq/activemq-blaze/trunk/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/resources/log4j.properties?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/resources/log4j.properties (added)
+++ activemq/activemq-blaze/trunk/src/test/resources/log4j.properties Tue Nov 25 06:27:23 2008
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file