You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/21 10:38:58 UTC

svn commit: r1505305 - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/impl/transport/

Author: rajdavies
Date: Sun Jul 21 08:38:58 2013
New Revision: 1505305

URL: http://svn.apache.org/r1505305
Log:
Use Disruptor to drain down sockets

Added:
    activemq/activemq-blaze/trunk/activeblaze.iml
Modified:
    activemq/activemq-blaze/trunk/pom.xml
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java

Added: activemq/activemq-blaze/trunk/activeblaze.iml
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/activeblaze.iml?rev=1505305&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/activeblaze.iml (added)
+++ activemq/activemq-blaze/trunk/activeblaze.iml Sun Jul 21 08:38:58 2013
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/main/resources" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/resources" isTestSource="true" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="library" name="Maven: commons-logging:commons-logging-api:1.0.4" level="project" />
+    <orderEntry type="library" name="Maven: org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1" level="project" />
+    <orderEntry type="library" name="Maven: com.lmax:disruptor:3.1.1" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: log4j:log4j:1.2.14" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" />
+  </component>
+</module>
+

Modified: activemq/activemq-blaze/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/pom.xml?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/pom.xml (original)
+++ activemq/activemq-blaze/trunk/pom.xml Sun Jul 21 08:38:58 2013
@@ -49,6 +49,12 @@
             <version>1.1</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.lmax</groupId>
+            <artifactId>disruptor</artifactId>
+            <version>3.1.1</version>
+        </dependency>
+
 
         <dependency>
             <groupId>commons-logging</groupId>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Sun Jul 21 08:38:58 2013
@@ -16,26 +16,27 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.activeblaze.wire.Buffer;
-import org.apache.activeblaze.wire.BufferInputStream;
-import org.apache.activeblaze.wire.BufferOutputStream;
-import org.apache.activeblaze.wire.Packet;
-import org.apache.activeblaze.wire.PacketAudit;
-import org.apache.activeblaze.wire.PacketType;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import org.apache.activeblaze.wire.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
 /**
  * Base Class for transports
  */
 public abstract class BaseTransport extends ThreadChainedProcessor {
     private static final Log LOG = LogFactory.getLog(BaseTransport.class);
     protected static final short MAGIC = 0xFAB;
-    static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    static final int DEFAULT_BUFFER_SIZE = 512 * 1024;
     private URI localURI;
     private Buffer bufferOfLocalURI;
     private int bufferSize = DEFAULT_BUFFER_SIZE;
@@ -45,19 +46,46 @@ public abstract class BaseTransport exte
     protected final PacketAudit audit = new PacketAudit();
     private boolean broadcast = true;
     private boolean enableAudit = false;
-    private int maxDispatchQueueSize = 10000;
-    private LinkedBlockingQueue<Packet> dispatchQueue;
-    private Thread dispatchQueueThread;
+    private int maxDispatchQueueSize = 1024;
+    private Disruptor<EventPacket> disruptor;
+    private RingBuffer<EventPacket> ringBuffer;
+    private ExecutorService executorService;
     private BufferOutputStream bufferOut = new BufferOutputStream(1024);
     private byte[] intBuffer = new byte[4];
 
+    private static class EventPacket {
+        private Packet packet;
+
+        private Packet getPacket() {
+            return packet;
+        }
+
+        private void setPacket(Packet packet) {
+            this.packet = packet;
+        }
+    }
+
     public void doInit() throws Exception {
         super.doInit();
         this.audit.init();
         if (this.localURI != null) {
             this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
-        this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
+        int maxDispatchSize = getMaxDispatchQueueSize();
+        //disruptor needs this to be a power of 2
+        if ((maxDispatchSize & (maxDispatchSize - 1)) != 0) {
+            //make it a power of 2
+            maxDispatchSize--;
+            maxDispatchSize |= maxDispatchQueueSize >> 1;
+            maxDispatchSize |= maxDispatchQueueSize >> 2;
+            maxDispatchSize |= maxDispatchQueueSize >> 4;
+            maxDispatchSize |= maxDispatchQueueSize >> 16;
+            maxDispatchSize |= maxDispatchQueueSize >> 32;
+            maxDispatchSize++;
+            LOG.warn("maxDispatchQueueSize has to be a power of 2 - setting from " + getMaxDispatchQueueSize() + " to " + maxDispatchSize);
+            setMaxDispatchQueueSize(maxDispatchSize);
+
+        }
     }
 
     public void doShutDown() throws Exception {
@@ -68,28 +96,60 @@ public abstract class BaseTransport exte
     public void doStart() throws Exception {
         super.doStart();
         this.audit.start();
-        Runnable runable = new Runnable() {
-            public void run() {
-                while (isStarted()) {
-                    dequeuePackets();
+
+
+        executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread result = new Thread(runnable, getLocalURI() + "-DisruptorThread");
+                result.setDaemon(true);
+                return result;
+            }
+        });
+
+        EventFactory<EventPacket> packetEventFactory = new EventFactory<EventPacket>() {
+            public EventPacket newInstance() {
+                return new EventPacket();
+            }
+        };
+        this.disruptor = new Disruptor<EventPacket>(packetEventFactory, getMaxDispatchQueueSize(), executorService);
+        final EventHandler<EventPacket> handler = new EventHandler<EventPacket>() {
+            public void onEvent(EventPacket eventPacket, long sequence, boolean endOfBatch) throws Exception {
+                Packet packet = eventPacket.getPacket();
+                try {
+                    if (packet != null) {
+                        passUpStream(packet);
+                    }
+                } catch (InterruptedException e1) {
+                    // we've stopped
+                } catch (Exception e) {
+                    String value = "";
+                    try {
+                        value = packet.toString();
+                    } catch (Throwable ignore) {
+                    }
+                    LOG.error("Caught an exception processing a packet: " + value, e);
+                    stopInternal();
                 }
             }
         };
-        this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
-        this.dispatchQueueThread.setDaemon(true);
-        this.dispatchQueueThread.start();
+        disruptor.handleEventsWith(handler);
+        ringBuffer = disruptor.start();
+    }
+
+    private void passUpStream(Packet packet) throws Exception {
+        super.upStream(packet);
     }
 
     public void doStop() throws Exception {
         super.doStop();
         this.audit.stop();
-        if (this.dispatchQueueThread != null) {
-            this.dispatchQueueThread.interrupt();
-            try {
-                this.dispatchQueueThread.join(100);
-            } catch (InterruptedException e) {
-            }
+        if (disruptor != null) {
+            disruptor.shutdown();
         }
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+
     }
 
     /**
@@ -230,7 +290,11 @@ public abstract class BaseTransport exte
     public void upStream(Packet packet) throws Exception {
         if (!isStopped()) {
             if (!this.enableAudit || !this.audit.isDuplicate(packet)) {
-                this.dispatchQueue.put(packet);
+                long sequence = ringBuffer.next();
+                EventPacket eventPacket = ringBuffer.get(sequence);
+                eventPacket.setPacket(packet);
+                ringBuffer.publish(sequence);
+
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(toString() + " Ignoring duplicate packet: " + packet);
@@ -239,25 +303,6 @@ public abstract class BaseTransport exte
         }
     }
 
-    protected void dequeuePackets() {
-        Packet packet = null;
-        try {
-            packet = this.dispatchQueue.take();
-            if (packet != null) {
-                super.upStream(packet);
-            }
-        } catch (InterruptedException e1) {
-            // we've stopped
-        } catch (Exception e) {
-            String value = "";
-            try {
-                value = packet.toString();
-            } catch (Throwable ignore) {
-            }
-            LOG.error("Caught an exception processing a packet: " + value, e);
-            stopInternal();
-        }
-    }
 
     public final synchronized void downStream(Packet packet) throws Exception {
         if (isInitialized()) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Sun Jul 21 08:38:58 2013
@@ -16,22 +16,16 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.SocketAddress;
-import java.net.StandardProtocolFamily;
-import java.net.StandardSocketOptions;
-import java.net.URI;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.wire.Buffer;
+
+import java.net.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.MembershipKey;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.wire.Buffer;
-
 /**
  * Multicast transport
  */
@@ -61,7 +55,7 @@ public class MulticastTransport extends 
             throw new BlazeException("Couldn't find an network interface named " + getNetworkInterfaceName());
         }
         try {
-            receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+            receiveBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
             datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
             if (datagramChannel.isOpen()) {
                 datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
@@ -117,7 +111,10 @@ public class MulticastTransport extends 
     public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
         if (isInitialized()) {
             ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
-            this.datagramChannel.send(byteBuffer, to);
+
+            while (byteBuffer.hasRemaining()) {
+                int sent = this.datagramChannel.send(byteBuffer, to);
+            }
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Sun Jul 21 08:38:58 2013
@@ -16,12 +16,12 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.SocketTimeoutException;
-
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.net.SocketTimeoutException;
+
 /**
  * Thread associated with processing
  */

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java Sun Jul 21 08:38:58 2013
@@ -16,12 +16,12 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.URI;
-import java.util.Map;
-
 import org.apache.activeblaze.util.ObjectFinder;
 import org.apache.activeblaze.util.PropertyUtil;
 
+import java.net.URI;
+import java.util.Map;
+
 
 /**
  * Find a Transport from a URI scheme

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Sun Jul 21 08:38:58 2013
@@ -16,15 +16,6 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardProtocolFamily;
-import java.net.StandardSocketOptions;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.util.Map;
-
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.SendRequest;
@@ -32,6 +23,11 @@ import org.apache.activeblaze.wire.Ack;
 import org.apache.activeblaze.wire.Buffer;
 import org.apache.activeblaze.wire.Packet;
 
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.Map;
+
 /**
  * UdpTransport
  */