You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2014/09/04 19:16:23 UTC
git commit: Applied the patch submitted in DIRMINA-964
Repository: mina
Updated Branches:
refs/heads/2.0 a2b686e11 -> bf5ee6508
Applied the patch submitted in DIRMINA-964
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/bf5ee650
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/bf5ee650
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/bf5ee650
Branch: refs/heads/2.0
Commit: bf5ee65088992f117acfb41cf3c372da30ee6105
Parents: a2b686e
Author: Emmanuel Lécharny <el...@symas.com>
Authored: Thu Sep 4 19:16:13 2014 +0200
Committer: Emmanuel Lécharny <el...@symas.com>
Committed: Thu Sep 4 19:16:13 2014 +0200
----------------------------------------------------------------------
.../core/polling/AbstractPollingIoAcceptor.java | 39 ++++++++++++++---
.../core/service/SimpleIoProcessorPool.java | 36 ++++++++++++---
.../mina/transport/socket/nio/NioProcessor.java | 35 ++++++++++++++-
.../transport/socket/nio/NioSocketAcceptor.java | 46 +++++++++++++++++++-
.../transport/socket/apr/AprSocketAcceptor.java | 6 +++
5 files changed, 145 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/bf5ee650/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index b511b73..5a4275c 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -21,6 +21,7 @@ package org.apache.mina.core.polling;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -110,7 +111,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
* type.
*/
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
- this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
+ this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
}
/**
@@ -129,7 +130,27 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
*/
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
int processorCount) {
- this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
+ this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
+ }
+
+ /**
+ * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
+ * session configuration, a class of {@link IoProcessor} which will be instantiated in a
+ * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
+ * systems.
+ *
+ * @see SimpleIoProcessorPool
+ *
+ * @param sessionConfig
+ * the default configuration for the managed {@link IoSession}
+ * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
+ * type.
+ * @param processorCount the amount of processor to instantiate for the pool
+ * @param selectorProvider The SelectorProvider to use
+ */
+ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
+ int processorCount, SelectorProvider selectorProvider ) {
+ this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
}
/**
@@ -145,7 +166,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
* events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
*/
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
- this(sessionConfig, null, processor, false);
+ this(sessionConfig, null, processor, false, null);
}
/**
@@ -165,7 +186,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
* events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
*/
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
- this(sessionConfig, executor, processor, false);
+ this(sessionConfig, executor, processor, false, null);
}
/**
@@ -188,7 +209,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
* will be automatically disposed
*/
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
- boolean createdProcessor) {
+ boolean createdProcessor, SelectorProvider selectorProvider) {
super(sessionConfig, executor);
if (processor == null) {
@@ -200,7 +221,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
try {
// Initialize the selector
- init();
+ init(selectorProvider);
// The selector is now ready, we can switch the
// flag to true so that incoming connection can be accepted
@@ -227,6 +248,12 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
protected abstract void init() throws Exception;
/**
+ * Initialize the polling system, will be called at construction time.
+ * @throws Exception any exception thrown by the underlying system calls
+ */
+ protected abstract void init(SelectorProvider selectorProvider) throws Exception;
+
+ /**
* Destroy the polling system, will be called when this {@link IoAcceptor}
* implementation will be disposed.
* @throws Exception any exception thrown by the underlying systems calls
http://git-wip-us.apache.org/repos/asf/mina/blob/bf5ee650/mina-core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java b/mina-core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
index d393c2c..ffef29c 100644
--- a/mina-core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
+++ b/mina-core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
@@ -20,6 +20,7 @@
package org.apache.mina.core.service;
import java.lang.reflect.Constructor;
+import java.nio.channels.spi.SelectorProvider;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -111,7 +112,7 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
* @param processorType The type of IoProcessor to use
*/
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
- this(processorType, null, DEFAULT_SIZE);
+ this(processorType, null, DEFAULT_SIZE, null);
}
/**
@@ -122,7 +123,19 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
* @param size The number of IoProcessor in the pool
*/
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
- this(processorType, null, size);
+ this(processorType, null, size, null);
+ }
+
+ /**
+ * Creates a new instance of SimpleIoProcessorPool with a defined
+ * number of IoProcessors in the pool
+ *
+ * @param processorType The type of IoProcessor to use
+ * @param size The number of IoProcessor in the pool
+ * @param selectorProvider The SelectorProvider to use
+ */
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) {
+ this(processorType, null, size, selectorProvider);
}
/**
@@ -132,7 +145,7 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
* @param executor The {@link Executor}
*/
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
- this(processorType, executor, DEFAULT_SIZE);
+ this(processorType, executor, DEFAULT_SIZE, null);
}
/**
@@ -143,7 +156,7 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
* @param size The number of IoProcessor in the pool
*/
@SuppressWarnings("unchecked")
- public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) {
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, SelectorProvider selectorProvider) {
if (processorType == null) {
throw new IllegalArgumentException("processorType");
}
@@ -178,8 +191,13 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
} catch (NoSuchMethodException e1) {
// To the next step...
try {
- processorConstructor = processorType.getConstructor(Executor.class);
- pool[0] = processorConstructor.newInstance(this.executor);
+ if(selectorProvider==null) {
+ processorConstructor = processorType.getConstructor(Executor.class);
+ pool[0] = processorConstructor.newInstance(this.executor);
+ } else {
+ processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
+ pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
+ }
} catch (NoSuchMethodException e2) {
// To the next step...
try {
@@ -213,7 +231,11 @@ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoPro
for (int i = 1; i < pool.length; i++) {
try {
if (usesExecutorArg) {
- pool[i] = processorConstructor.newInstance(this.executor);
+ if(selectorProvider==null) {
+ pool[i] = processorConstructor.newInstance(this.executor);
+ } else {
+ pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
+ }
} else {
pool[i] = processorConstructor.newInstance();
}
http://git-wip-us.apache.org/repos/asf/mina/blob/bf5ee650/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
index 1cbc1ee..3692ea9 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
@@ -26,6 +26,7 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -45,6 +46,8 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
/** The selector associated with this processor */
private Selector selector;
+ private SelectorProvider selectorProvider = null;
+
/**
*
* Creates a new instance of NioProcessor.
@@ -62,6 +65,28 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
}
}
+ /**
+ *
+ * Creates a new instance of NioProcessor.
+ *
+ * @param executor
+ */
+ public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
+ super(executor);
+
+ try {
+ // Open a new selector
+ if (selectorProvider == null) {
+ selector = Selector.open();
+ } else {
+ selector = selectorProvider.openSelector();
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeIoException("Failed to open a selector.", e);
+ }
+ }
+
@Override
protected void doDispose() throws Exception {
selector.close();
@@ -127,7 +152,13 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
Set<SelectionKey> keys = selector.keys();
// Open a new selector
- Selector newSelector = Selector.open();
+ Selector newSelector = null;
+
+ if (selectorProvider == null) {
+ newSelector = Selector.open();
+ } else {
+ newSelector = selectorProvider.openSelector();
+ }
// Loop on all the registered keys, and register them on the new selector
for (SelectionKey key : keys) {
@@ -342,4 +373,4 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
iterator.remove();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/bf5ee650/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
index ca12831..f73e3ce 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
@@ -28,6 +28,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executor;
@@ -51,6 +52,7 @@ public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSessio
implements SocketAcceptor {
private volatile Selector selector;
+ private volatile SelectorProvider selectorProvider = null;
/**
* Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
@@ -96,6 +98,21 @@ public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSessio
}
/**
+ * Constructor for {@link NioSocketAcceptor} using default parameters, and
+ * given number of {@link NioProcessor} for multithreading I/O operations, and
+ * a custom SelectorProvider for NIO
+ *
+ * @param processorCount the number of processor to create and place in a
+ * @param selectorProvider teh SelectorProvider to use
+ * {@link SimpleIoProcessorPool}
+ */
+ public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
+ super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
+ ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
+ this.selectorProvider = selectorProvider;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -107,6 +124,20 @@ public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSessio
* {@inheritDoc}
*/
@Override
+ protected void init(SelectorProvider selectorProvider) throws Exception {
+ this.selectorProvider = selectorProvider;
+
+ if (selectorProvider == null) {
+ selector = Selector.open();
+ } else {
+ selector = selectorProvider.openSelector();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
protected void destroy() throws Exception {
if (selector != null) {
selector.close();
@@ -149,7 +180,11 @@ public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSessio
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
- SelectionKey key = handle.keyFor(selector);
+ SelectionKey key = null;
+
+ if (handle != null) {
+ key = handle.keyFor(selector);
+ }
if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
@@ -171,7 +206,14 @@ public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSessio
@Override
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
- ServerSocketChannel channel = ServerSocketChannel.open();
+
+ ServerSocketChannel channel = null;
+
+ if (selectorProvider != null) {
+ channel = selectorProvider.openServerSocketChannel();
+ } else {
+ channel = ServerSocketChannel.open();
+ }
boolean success = false;
http://git-wip-us.apache.org/repos/asf/mina/blob/bf5ee650/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSocketAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSocketAcceptor.java b/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSocketAcceptor.java
index ea3f765..3a1cef9 100644
--- a/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSocketAcceptor.java
+++ b/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSocketAcceptor.java
@@ -22,6 +22,7 @@ package org.apache.mina.transport.socket.apr;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -364,4 +365,9 @@ public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSessio
private void throwException(int code) throws IOException {
throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
}
+
+ @Override
+ protected void init(SelectorProvider selectorProvider) throws Exception {
+ init();
+ }
}