You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/21 10:38:58 UTC
svn commit: r1505305 - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/impl/transport/
Author: rajdavies
Date: Sun Jul 21 08:38:58 2013
New Revision: 1505305
URL: http://svn.apache.org/r1505305
Log:
Use Disruptor to drain down sockets
Added:
activemq/activemq-blaze/trunk/activeblaze.iml
Modified:
activemq/activemq-blaze/trunk/pom.xml
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
Added: activemq/activemq-blaze/trunk/activeblaze.iml
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/activeblaze.iml?rev=1505305&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/activeblaze.iml (added)
+++ activemq/activemq-blaze/trunk/activeblaze.iml Sun Jul 21 08:38:58 2013
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <output-test url="file://$MODULE_DIR$/target/test-classes" />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/main/resources" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/resources" isTestSource="true" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" name="Maven: commons-logging:commons-logging-api:1.0.4" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1" level="project" />
+ <orderEntry type="library" name="Maven: com.lmax:disruptor:3.1.1" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: log4j:log4j:1.2.14" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" />
+ </component>
+</module>
+
Modified: activemq/activemq-blaze/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/pom.xml?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/pom.xml (original)
+++ activemq/activemq-blaze/trunk/pom.xml Sun Jul 21 08:38:58 2013
@@ -49,6 +49,12 @@
<version>1.1</version>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>3.1.1</version>
+ </dependency>
+
<dependency>
<groupId>commons-logging</groupId>
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Sun Jul 21 08:38:58 2013
@@ -16,26 +16,27 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.activeblaze.wire.Buffer;
-import org.apache.activeblaze.wire.BufferInputStream;
-import org.apache.activeblaze.wire.BufferOutputStream;
-import org.apache.activeblaze.wire.Packet;
-import org.apache.activeblaze.wire.PacketAudit;
-import org.apache.activeblaze.wire.PacketType;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import org.apache.activeblaze.wire.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
/**
* Base Class for transports
*/
public abstract class BaseTransport extends ThreadChainedProcessor {
private static final Log LOG = LogFactory.getLog(BaseTransport.class);
protected static final short MAGIC = 0xFAB;
- static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ static final int DEFAULT_BUFFER_SIZE = 512 * 1024;
private URI localURI;
private Buffer bufferOfLocalURI;
private int bufferSize = DEFAULT_BUFFER_SIZE;
@@ -45,19 +46,46 @@ public abstract class BaseTransport exte
protected final PacketAudit audit = new PacketAudit();
private boolean broadcast = true;
private boolean enableAudit = false;
- private int maxDispatchQueueSize = 10000;
- private LinkedBlockingQueue<Packet> dispatchQueue;
- private Thread dispatchQueueThread;
+ private int maxDispatchQueueSize = 1024;
+ private Disruptor<EventPacket> disruptor;
+ private RingBuffer<EventPacket> ringBuffer;
+ private ExecutorService executorService;
private BufferOutputStream bufferOut = new BufferOutputStream(1024);
private byte[] intBuffer = new byte[4];
+ private static class EventPacket {
+ private Packet packet;
+
+ private Packet getPacket() {
+ return packet;
+ }
+
+ private void setPacket(Packet packet) {
+ this.packet = packet;
+ }
+ }
+
public void doInit() throws Exception {
super.doInit();
this.audit.init();
if (this.localURI != null) {
this.bufferOfLocalURI = new Buffer(this.localURI.toString());
}
- this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
+ int maxDispatchSize = getMaxDispatchQueueSize();
+ //disruptor needs this to be a power of 2
+ if ((maxDispatchSize & (maxDispatchSize - 1)) != 0) {
+ //make it a power of 2
+ maxDispatchSize--;
+ maxDispatchSize |= maxDispatchQueueSize >> 1;
+ maxDispatchSize |= maxDispatchQueueSize >> 2;
+ maxDispatchSize |= maxDispatchQueueSize >> 4;
+ maxDispatchSize |= maxDispatchQueueSize >> 16;
+ maxDispatchSize |= maxDispatchQueueSize >> 32;
+ maxDispatchSize++;
+ LOG.warn("maxDispatchQueueSize has to be a power of 2 - setting from " + getMaxDispatchQueueSize() + " to " + maxDispatchSize);
+ setMaxDispatchQueueSize(maxDispatchSize);
+
+ }
}
public void doShutDown() throws Exception {
@@ -68,28 +96,60 @@ public abstract class BaseTransport exte
public void doStart() throws Exception {
super.doStart();
this.audit.start();
- Runnable runable = new Runnable() {
- public void run() {
- while (isStarted()) {
- dequeuePackets();
+
+
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread result = new Thread(runnable, getLocalURI() + "-DisruptorThread");
+ result.setDaemon(true);
+ return result;
+ }
+ });
+
+ EventFactory<EventPacket> packetEventFactory = new EventFactory<EventPacket>() {
+ public EventPacket newInstance() {
+ return new EventPacket();
+ }
+ };
+ this.disruptor = new Disruptor<EventPacket>(packetEventFactory, getMaxDispatchQueueSize(), executorService);
+ final EventHandler<EventPacket> handler = new EventHandler<EventPacket>() {
+ public void onEvent(EventPacket eventPacket, long sequence, boolean endOfBatch) throws Exception {
+ Packet packet = eventPacket.getPacket();
+ try {
+ if (packet != null) {
+ passUpStream(packet);
+ }
+ } catch (InterruptedException e1) {
+ // we've stopped
+ } catch (Exception e) {
+ String value = "";
+ try {
+ value = packet.toString();
+ } catch (Throwable ignore) {
+ }
+ LOG.error("Caught an exception processing a packet: " + value, e);
+ stopInternal();
}
}
};
- this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
- this.dispatchQueueThread.setDaemon(true);
- this.dispatchQueueThread.start();
+ disruptor.handleEventsWith(handler);
+ ringBuffer = disruptor.start();
+ }
+
+ private void passUpStream(Packet packet) throws Exception {
+ super.upStream(packet);
}
public void doStop() throws Exception {
super.doStop();
this.audit.stop();
- if (this.dispatchQueueThread != null) {
- this.dispatchQueueThread.interrupt();
- try {
- this.dispatchQueueThread.join(100);
- } catch (InterruptedException e) {
- }
+ if (disruptor != null) {
+ disruptor.shutdown();
}
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+
}
/**
@@ -230,7 +290,11 @@ public abstract class BaseTransport exte
public void upStream(Packet packet) throws Exception {
if (!isStopped()) {
if (!this.enableAudit || !this.audit.isDuplicate(packet)) {
- this.dispatchQueue.put(packet);
+ long sequence = ringBuffer.next();
+ EventPacket eventPacket = ringBuffer.get(sequence);
+ eventPacket.setPacket(packet);
+ ringBuffer.publish(sequence);
+
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(toString() + " Ignoring duplicate packet: " + packet);
@@ -239,25 +303,6 @@ public abstract class BaseTransport exte
}
}
- protected void dequeuePackets() {
- Packet packet = null;
- try {
- packet = this.dispatchQueue.take();
- if (packet != null) {
- super.upStream(packet);
- }
- } catch (InterruptedException e1) {
- // we've stopped
- } catch (Exception e) {
- String value = "";
- try {
- value = packet.toString();
- } catch (Throwable ignore) {
- }
- LOG.error("Caught an exception processing a packet: " + value, e);
- stopInternal();
- }
- }
public final synchronized void downStream(Packet packet) throws Exception {
if (isInitialized()) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Sun Jul 21 08:38:58 2013
@@ -16,22 +16,16 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.SocketAddress;
-import java.net.StandardProtocolFamily;
-import java.net.StandardSocketOptions;
-import java.net.URI;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.wire.Buffer;
+
+import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.MembershipKey;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.wire.Buffer;
-
/**
* Multicast transport
*/
@@ -61,7 +55,7 @@ public class MulticastTransport extends
throw new BlazeException("Couldn't find an network interface named " + getNetworkInterfaceName());
}
try {
- receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+ receiveBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
if (datagramChannel.isOpen()) {
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
@@ -117,7 +111,10 @@ public class MulticastTransport extends
public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
if (isInitialized()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
- this.datagramChannel.send(byteBuffer, to);
+
+ while (byteBuffer.hasRemaining()) {
+ int sent = this.datagramChannel.send(byteBuffer, to);
+ }
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Sun Jul 21 08:38:58 2013
@@ -16,12 +16,12 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.SocketTimeoutException;
-
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.net.SocketTimeoutException;
+
/**
* Thread associated with processing
*/
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java Sun Jul 21 08:38:58 2013
@@ -16,12 +16,12 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.URI;
-import java.util.Map;
-
import org.apache.activeblaze.util.ObjectFinder;
import org.apache.activeblaze.util.PropertyUtil;
+import java.net.URI;
+import java.util.Map;
+
/**
* Find a Transport from a URI scheme
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=1505305&r1=1505304&r2=1505305&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Sun Jul 21 08:38:58 2013
@@ -16,15 +16,6 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardProtocolFamily;
-import java.net.StandardSocketOptions;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.util.Map;
-
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
@@ -32,6 +23,11 @@ import org.apache.activeblaze.wire.Ack;
import org.apache.activeblaze.wire.Buffer;
import org.apache.activeblaze.wire.Packet;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.Map;
+
/**
* UdpTransport
*/