You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/20 14:18:08 UTC
svn commit: r596653 - in /mina/trunk/core/src/main/java/org/apache/mina:
common/AbstractPollingIoAcceptor.java
transport/socket/nio/NioSocketAcceptor.java
Author: trustin
Date: Tue Nov 20 05:18:08 2007
New Revision: 596653
URL: http://svn.apache.org/viewvc?rev=596653&view=rev
Log:
Implemented an abstraction layer for polling acceptors
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=596653&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Tue Nov 20 05:18:08 2007
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedSelectorException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
+ extends AbstractIoAcceptor {
+
+ private static final AtomicInteger id = new AtomicInteger();
+
+ private final Executor executor;
+ private final String threadName;
+ private final IoProcessor<T> processor;
+ private final boolean createdProcessor;
+
+ private final Object lock = new Object();
+
+ private final Queue<ServiceOperationFuture> registerQueue =
+ new ConcurrentLinkedQueue<ServiceOperationFuture>();
+ private final Queue<ServiceOperationFuture> cancelQueue =
+ new ConcurrentLinkedQueue<ServiceOperationFuture>();
+
+ private final Map<SocketAddress, H> boundHandles =
+ Collections.synchronizedMap(new HashMap<SocketAddress, H>());
+ private Worker worker;
+
+ /**
+ * Create an acceptor with a single processing thread using a NewThreadExecutor
+ */
+ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
+ this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
+ }
+
+ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
+ this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
+ }
+
+ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
+ this(sessionConfig, null, processor, false);
+ }
+
+ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
+ this(sessionConfig, executor, processor, false);
+ }
+
+ private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
+ super(sessionConfig);
+
+ if (executor == null) {
+ executor = new NewThreadExecutor();
+ }
+ if (processor == null) {
+ throw new NullPointerException("processor");
+ }
+
+ this.executor = executor;
+ this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
+ this.processor = processor;
+ this.createdProcessor = createdProcessor;
+
+ doInit();
+ }
+
+ protected abstract void doInit();
+ protected abstract void doDispose0();
+ protected abstract boolean selectable();
+ protected abstract boolean select() throws Exception;
+ protected abstract void wakeup();
+ protected abstract Iterator<H> selectedHandles();
+ protected abstract H bind(SocketAddress localAddress) throws Exception;
+ protected abstract SocketAddress localAddress(H handle) throws Exception;
+ protected abstract T accept(IoProcessor<T> processor, H handle) throws Exception;
+ protected abstract void unbind(H handle) throws Exception;
+
+ @Override
+ protected void doBind() throws Exception {
+ ServiceOperationFuture request = new ServiceOperationFuture();
+
+ // adds the Registration request to the queue for the Workers
+ // to handle
+ registerQueue.add(request);
+
+ // creates an instance of a Worker and has the local
+ // executor kick it off.
+ startupWorker();
+ wakeup();
+ request.awaitUninterruptibly();
+
+ if (request.getException() != null) {
+ throw request.getException();
+ }
+
+ // Update the local addresses.
+ // setLocalAddresses() shouldn't be called from the worker thread
+ // because of deadlock.
+ Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ for (H handle: boundHandles.values()) {
+ newLocalAddresses.add(localAddress(handle));
+ }
+ setLocalAddresses(newLocalAddresses);
+ }
+
+ /**
+ * This method is called by the doBind() and doUnbind()
+ * methods. If the worker object is not null, presumably
+ * the acceptor is starting up, then the worker object will
+ * be created and kicked off by the executor. If the worker
+ * object is not null, probably already created and this class
+ * is now working, then nothing will happen and the method
+ * will just return.
+ */
+ private void startupWorker() {
+ if (!selectable()) {
+ registerQueue.clear();
+ cancelQueue.clear();
+ throw new ClosedSelectorException();
+ }
+ synchronized (lock) {
+ if (worker == null) {
+ worker = new Worker();
+
+ executor.execute(new NamePreservingRunnable(worker, threadName));
+ }
+ }
+ }
+
+ @Override
+ protected void doUnbind() throws Exception {
+ ServiceOperationFuture future = new ServiceOperationFuture();
+
+ cancelQueue.add(future);
+ startupWorker();
+ wakeup();
+
+ future.awaitUninterruptibly();
+ if (future.getException() != null) {
+ throw future.getException();
+ }
+ }
+
+ /**
+ * This class is called by the startupWorker() method and is
+ * placed into a NamePreservingRunnable class.
+ */
+ private class Worker implements Runnable {
+ public void run() {
+ int nHandles = 0;
+
+ for (;;) {
+ try {
+ // gets the number of keys that are ready to go
+ boolean selected = select();
+
+ // this actually sets the selector to OP_ACCEPT,
+ // and binds to the port in which this class will
+ // listen on
+ nHandles += registerHandles();
+
+ if (selected) {
+ processHandles(selectedHandles());
+ }
+
+ // check to see if any cancellation request has been made.
+ nHandles -= unregisterHandles();
+
+ if (nHandles == 0) {
+ synchronized (lock) {
+ if (registerQueue.isEmpty() &&
+ cancelQueue.isEmpty()) {
+ worker = null;
+ break;
+ }
+ }
+ }
+ } catch (Throwable e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+
+ if (isDisposed()) {
+ try {
+ if (createdProcessor) {
+ processor.dispose();
+ }
+ } finally {
+ doDispose0();
+ }
+ }
+ }
+
+ /**
+ * This method will process new sessions for the Worker class. All
+ * keys that have had their status updates as per the Selector.selectedKeys()
+ * method will be processed here. Only keys that are ready to accept
+ * connections are handled here.
+ * <p/>
+ * Session objects are created by making new instances of SocketSessionImpl
+ * and passing the session object to the SocketIoProcessor class.
+ */
+ @SuppressWarnings("unchecked")
+ private void processHandles(Iterator<H> handles) {
+ while (handles.hasNext()) {
+ H handle = handles.next();
+ handles.remove();
+
+ boolean success = false;
+ try {
+ T session = accept(processor, handle);
+ if (session == null) {
+ break;
+ }
+
+ finishSessionInitialization(session, null);
+
+ // add the session to the SocketIoProcessor
+ session.getProcessor().add(session);
+ success = true;
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ } finally {
+ if (!success) {
+ try {
+ unbind(handle);
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets up the socket communications. Sets items such as:
+ * <p/>
+ * Blocking
+ * Reuse address
+ * Receive buffer size
+ * Bind to listen port
+ * Registers OP_ACCEPT for selector
+ */
+ private int registerHandles() {
+ for (;;) {
+ ServiceOperationFuture future = registerQueue.poll();
+ if (future == null) {
+ break;
+ }
+
+ Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
+ List<SocketAddress> localAddresses = getLocalAddresses();
+
+ try {
+ for (SocketAddress a: localAddresses) {
+ H handle = null;
+ boolean success = false;
+ try {
+ handle = bind(a);
+ success = true;
+ } finally {
+ if (!success && handle != null) {
+ try {
+ unbind(handle);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ newHandles.put(localAddress(handle), handle);
+ }
+
+ boundHandles.putAll(newHandles);
+
+ // and notify.
+ future.setDone();
+ } catch (Exception e) {
+ future.setException(e);
+ } finally {
+ // Roll back if failed to bind all addresses.
+ if (future.getException() != null) {
+ for (H handle: newHandles.values()) {
+ try {
+ unbind(handle);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ wakeup();
+ }
+ }
+ }
+
+ return boundHandles.size();
+ }
+
+ /**
+ * This method just checks to see if anything has been placed into the
+ * cancellation queue. The only thing that should be in the cancelQueue
+ * is CancellationRequest objects and the only place this happens is in
+ * the doUnbind() method.
+ */
+ private int unregisterHandles() {
+ int cancelledHandles = 0;
+ for (; ;) {
+ ServiceOperationFuture future = cancelQueue.poll();
+ if (future == null) {
+ break;
+ }
+
+ // close the channels
+ for (H handle: boundHandles.values()) {
+ try {
+ unbind(handle);
+ wakeup(); // wake up again to trigger thread death
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+
+ cancelledHandles ++;
+ }
+
+ boundHandles.clear();
+ future.setDone();
+ }
+
+ return cancelledHandles;
+ }
+
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+ throw new UnsupportedOperationException();
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=596653&r1=596652&r2=596653&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Tue Nov 20 05:18:08 2007
@@ -23,36 +23,24 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.AbstractPollingIoAcceptor;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIoException;
-import org.apache.mina.common.SimpleIoProcessorPool;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.SocketSessionConfig;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
/**
* {@link IoAcceptor} for socket transport (TCP/IP). This class
@@ -61,64 +49,36 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
*/
-public class NioSocketAcceptor extends AbstractIoAcceptor implements SocketAcceptor {
-
- private static final AtomicInteger id = new AtomicInteger();
+public class NioSocketAcceptor
+ extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
+ implements SocketAcceptor {
private int backlog = 50;
private boolean reuseAddress = true;
- private final Executor executor;
- private final String threadName;
- private final IoProcessor<NioSession> processor;
- private final boolean createdProcessor;
-
- private final Object lock = new Object();
-
- private final Queue<ServiceOperationFuture> registerQueue =
- new ConcurrentLinkedQueue<ServiceOperationFuture>();
- private final Queue<ServiceOperationFuture> cancelQueue =
- new ConcurrentLinkedQueue<ServiceOperationFuture>();
-
- private final Map<SocketAddress, ServerSocketChannel> serverChannels =
- Collections.synchronizedMap(new HashMap<SocketAddress, ServerSocketChannel>());
- private final Selector selector;
- private Worker worker;
+ private volatile Selector selector;
/**
* Create an acceptor with a single processing thread using a NewThreadExecutor
*/
public NioSocketAcceptor() {
- this(null, new SimpleIoProcessorPool<NioSession>(NioProcessor.class), true);
+ super(new DefaultSocketSessionConfig(), NioProcessor.class);
}
public NioSocketAcceptor(int processorCount) {
- this(null, new SimpleIoProcessorPool<NioSession>(NioProcessor.class, processorCount), true);
+ super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
}
public NioSocketAcceptor(IoProcessor<NioSession> processor) {
- this(null, processor, false);
+ super(new DefaultSocketSessionConfig(), processor);
}
public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
- this(executor, processor, false);
+ super(new DefaultSocketSessionConfig(), executor, processor);
}
- private NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor, boolean createdProcessor) {
- super(new DefaultSocketSessionConfig());
-
- if (executor == null) {
- executor = new NewThreadExecutor();
- }
- if (processor == null) {
- throw new NullPointerException("processor");
- }
-
- this.executor = executor;
- this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
- this.processor = processor;
- this.createdProcessor = createdProcessor;
-
+ @Override
+ protected void doInit() {
// The default reuseAddress of an accepted socket should be 'true'.
getSessionConfig().setReuseAddress(true);
@@ -143,23 +103,18 @@
try {
this.selector = Selector.open();
} catch (IOException e) {
- disposeNow();
+ doDispose0();
throw new RuntimeIoException("Failed to open a selector.", e);
}
}
- private void disposeNow() {
- try {
- if (createdProcessor) {
- processor.dispose();
- }
- } finally {
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
+ @Override
+ protected void doDispose0() {
+ if (selector != null) {
+ try {
+ selector.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
@@ -212,278 +167,99 @@
}
}
+
@Override
- protected void doBind() throws Exception {
- ServiceOperationFuture request = new ServiceOperationFuture();
-
- // adds the Registration request to the queue for the Workers
- // to handle
- registerQueue.add(request);
-
- // creates an instance of a Worker and has the local
- // executor kick it off.
- startupWorker();
+ protected NioSession accept(IoProcessor<NioSession> processor,
+ ServerSocketChannel handle) throws Exception {
- selector.wakeup();
-
- request.awaitUninterruptibly();
-
- if (request.getException() != null) {
- throw request.getException();
+ SelectionKey key = handle.keyFor(selector);
+ if (!key.isAcceptable()) {
+ return null;
}
- // Update the local addresses.
- // setLocalAddresses() shouldn't be called from the worker thread
- // because of deadlock.
- Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
- for (ServerSocketChannel c: serverChannels.values()) {
- newLocalAddresses.add(c.socket().getLocalSocketAddress());
+ // accept the connection from the client
+ SocketChannel ch = handle.accept();
+ if (ch == null) {
+ return null;
}
- setLocalAddresses(newLocalAddresses);
- }
-
- /**
- * This method is called by the doBind() and doUnbind()
- * methods. If the worker object is not null, presumably
- * the acceptor is starting up, then the worker object will
- * be created and kicked off by the executor. If the worker
- * object is not null, probably already created and this class
- * is now working, then nothing will happen and the method
- * will just return.
- */
- private void startupWorker() {
- if (!selector.isOpen()) {
- registerQueue.clear();
- cancelQueue.clear();
- throw new ClosedSelectorException();
- }
- synchronized (lock) {
- if (worker == null) {
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker, threadName));
- }
- }
+ return new NioSocketSession(this, processor, ch);
}
@Override
- protected void doUnbind() throws Exception {
- ServiceOperationFuture future = new ServiceOperationFuture();
-
- cancelQueue.add(future);
- startupWorker();
- selector.wakeup();
-
- future.awaitUninterruptibly();
- if (future.getException() != null) {
- throw future.getException();
- }
+ protected ServerSocketChannel bind(SocketAddress localAddress)
+ throws Exception {
+ ServerSocketChannel c = ServerSocketChannel.open();
+ c.configureBlocking(false);
+ // Configure the server socket,
+ c.socket().setReuseAddress(isReuseAddress());
+ c.socket().setReceiveBufferSize(
+ getSessionConfig().getReceiveBufferSize());
+ // and bind.
+ c.socket().bind(localAddress, getBacklog());
+ c.register(selector, SelectionKey.OP_ACCEPT);
+ return c;
}
- /**
- * This class is called by the startupWorker() method and is
- * placed into a NamePreservingRunnable class.
- */
- private class Worker implements Runnable {
- public void run() {
- for (;;) {
- try {
- // gets the number of keys that are ready to go
- int nKeys = selector.select();
-
- // this actually sets the selector to OP_ACCEPT,
- // and binds to the port in which this class will
- // listen on
- registerNew();
-
- if (nKeys > 0) {
- processSessions(selector.selectedKeys());
- }
-
- // check to see if any cancellation request has been made.
- cancelKeys();
-
- if (selector.keys().isEmpty()) {
- synchronized (lock) {
- if (selector.keys().isEmpty()
- && registerQueue.isEmpty()
- && cancelQueue.isEmpty()) {
- worker = null;
- break;
- }
- }
- }
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
-
- if (isDisposed()) {
- disposeNow();
- }
- }
-
- /**
- * This method will process new sessions for the Worker class. All
- * keys that have had their status updates as per the Selector.selectedKeys()
- * method will be processed here. Only keys that are ready to accept
- * connections are handled here.
- * <p/>
- * Session objects are created by making new instances of SocketSessionImpl
- * and passing the session object to the SocketIoProcessor class.
- */
- private void processSessions(Set<SelectionKey> keys) throws IOException {
- Iterator<SelectionKey> it = keys.iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
-
- it.remove();
-
- if (!key.isAcceptable()) {
- continue;
- }
+ @Override
+ protected SocketAddress localAddress(ServerSocketChannel handle)
+ throws Exception {
+ return handle.socket().getLocalSocketAddress();
+ }
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
+ @Override
+ protected boolean select() throws Exception {
+ return selector.select() > 0;
+ }
- // accept the connection from the client
- SocketChannel ch = ssc.accept();
+ @Override
+ protected boolean selectable() {
+ return selector.isOpen();
+ }
- if (ch == null) {
- continue;
- }
+ @Override
+ protected Iterator<ServerSocketChannel> selectedHandles() {
+ return new ServerSocketChannelIterator(selector.selectedKeys());
+ }
- boolean success = false;
- try {
- // Create a new session object. This class extends
- // BaseIoSession and is custom for socket-based sessions.
- NioSocketSession session = new NioSocketSession(
- NioSocketAcceptor.this, processor, ch);
-
- finishSessionInitialization(session, null);
-
- // add the session to the SocketIoProcessor
- session.getProcessor().add(session);
- success = true;
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- } finally {
- if (!success) {
- ch.close();
- }
- }
- }
+ @Override
+ protected void unbind(ServerSocketChannel handle) throws Exception {
+ SelectionKey key = handle.keyFor(selector);
+ if (key != null) {
+ key.cancel();
}
+ handle.close();
}
- /**
- * Sets up the socket communications. Sets items such as:
- * <p/>
- * Blocking
- * Reuse address
- * Receive buffer size
- * Bind to listen port
- * Registers OP_ACCEPT for selector
- */
- private void registerNew() {
- for (;;) {
- ServiceOperationFuture future = registerQueue.poll();
- if (future == null) {
- break;
- }
-
- Map<SocketAddress, ServerSocketChannel> newServerChannels =
- new HashMap<SocketAddress, ServerSocketChannel>();
- List<SocketAddress> localAddresses = getLocalAddresses();
-
- try {
- for (SocketAddress a: localAddresses) {
- ServerSocketChannel c = null;
- boolean success = false;
- try {
- c = ServerSocketChannel.open();
- c.configureBlocking(false);
- // Configure the server socket,
- c.socket().setReuseAddress(isReuseAddress());
- c.socket().setReceiveBufferSize(
- getSessionConfig().getReceiveBufferSize());
- // and bind.
- c.socket().bind(a, getBacklog());
- c.register(selector, SelectionKey.OP_ACCEPT, future);
- success = true;
- } finally {
- if (!success && c != null) {
- try {
- c.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
-
- newServerChannels.put(c.socket().getLocalSocketAddress(), c);
- }
-
- serverChannels.putAll(newServerChannels);
-
- // and notify.
- future.setDone();
- } catch (Exception e) {
- future.setException(e);
- } finally {
- // Roll back if failed to bind all addresses.
- if (future.getException() != null) {
- for (ServerSocketChannel c: newServerChannels.values()) {
- c.keyFor(selector).cancel();
- try {
- c.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- selector.wakeup();
- }
- }
- }
+ @Override
+ protected void wakeup() {
+ selector.wakeup();
}
- /**
- * This method just checks to see if anything has been placed into the
- * cancellation queue. The only thing that should be in the cancelQueue
- * is CancellationRequest objects and the only place this happens is in
- * the doUnbind() method.
- */
- private void cancelKeys() {
- for (; ;) {
- ServiceOperationFuture future = cancelQueue.poll();
- if (future == null) {
- break;
- }
+ @Override
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
+
+ private final Iterator<SelectionKey> i;
+
+ private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
+ this.i = selectedKeys.iterator();
+ }
- // close the channels
- for (ServerSocketChannel c: serverChannels.values()) {
- try {
- SelectionKey key = c.keyFor(selector);
- key.cancel();
+ public boolean hasNext() {
+ return i.hasNext();
+ }
- selector.wakeup(); // wake up again to trigger thread death
- c.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
-
- serverChannels.clear();
- future.setDone();
+ public ServerSocketChannel next() {
+ SelectionKey key = i.next();
+ return (ServerSocketChannel) key.channel();
}
- }
- public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
- throw new UnsupportedOperationException();
+ public void remove() {
+ i.remove();
+ }
}
}