You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/14 22:48:44 UTC
svn commit: r890493 - in
/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio:
./ NIODispatcherThread.java SelectableDispatchContext.java
Author: cmacnaug
Date: Mon Dec 14 21:48:44 2009
New Revision: 890493
URL: http://svn.apache.org/viewvc?rev=890493&view=rev
Log:
Adding the beginnings of some NIO bits. These are adapted from an NIO implementation based on trunk, and more work is needed to bring them into the GCD model.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java (with props)
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java (with props)
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java?rev=890493&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java Mon Dec 14 21:48:44 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.nio;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+
+public class NIODispatcherThread extends DispatcherThread {
+ private final boolean DEBUG = false;
+
+ private final Selector selector;
+
+ protected NIODispatcherThread(AdvancedDispatcher dispatcher, String name, int priorities) throws IOException {
+ super(dispatcher, name, priorities);
+ this.selector = Selector.open();
+ }
+
+ Selector getSelector() {
+ return selector;
+ }
+
+ protected void cleanup() {
+ try {
+ selector.close();
+ } catch (IOException e) {
+ if (DEBUG) {
+ debug("Error closing selector", e);
+ }
+ }
+ }
+
+ public SelectableDispatchContext registerSelectable(Runnable dispatchable, String name) {
+ return new SelectableDispatchContext(this, dispatchable, name);
+ }
+
+ /**
+ * Subclasses may override this to do do additional dispatch work:
+ *
+ * @throws Exception
+ */
+ protected void dispatchHook() throws Exception {
+ doSelect(true);
+ }
+
+ protected void waitForEvents() throws Exception {
+ doSelect(false);
+ }
+
+ /**
+ * Subclasses may override this to provide an alternative wakeup mechanism.
+ */
+ protected void wakeup() {
+ selector.wakeup();
+ }
+
+ private long lastSelect = System.nanoTime();
+ private long frequency = 50000000;
+
+ private void doSelect(boolean now) throws IOException {
+
+ // Select what's ready now:
+ try {
+ if (now) {
+ if (selector.keys().isEmpty()) {
+ return;
+ }
+ // selector.selectNow();
+ // processSelected();
+
+ long time = System.nanoTime();
+ if (time - lastSelect > frequency) {
+ selector.selectNow();
+ lastSelect = time;
+
+ int registered = selector.keys().size();
+ int selected = selector.selectedKeys().size();
+ if (selected == 0) {
+ frequency += 1000000;
+ if (DEBUG)
+ debug(this + "Increased select frequency to " + frequency);
+ } else if (selected > registered / 4) {
+ frequency -= 1000000;
+ if (DEBUG)
+ debug(this + "Decreased select frequency to " + frequency);
+ }
+ processSelected();
+
+ }
+ } else {
+ long next = timerHeap.timeToNext(TimeUnit.MILLISECONDS);
+ if (next == -1) {
+ selector.select();
+ } else if (next > 0) {
+ selector.select(next);
+ } else {
+ selector.selectNow();
+ }
+ lastSelect = System.nanoTime();
+ processSelected();
+ }
+
+ } catch (CancelledKeyException ignore) {
+ // A key may have been canceled.
+ }
+
+ }
+
+ private void processSelected() {
+
+ // Walk the set of ready keys servicing each ready context:
+ Set<SelectionKey> selectedKeys = selector.selectedKeys();
+ if (!selectedKeys.isEmpty()) {
+ for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
+ boolean done = false;
+ SelectionKey key = i.next();
+ if (key.isValid()) {
+ SelectableDispatchContext context = (SelectableDispatchContext) key.attachment();
+
+ done = true;
+ try {
+ done = context.onSelect();
+ } catch (RuntimeException re) {
+ if (DEBUG)
+ debug("Exception in " + context + " closing");
+ // If there is a Runtime error close the context:
+ // TODO better error handling here:
+ context.close(false);
+ }
+ } else {
+ done = true;
+ }
+
+ // If no more interests remove:
+ if (done) {
+ i.remove();
+ }
+ }
+ }
+ }
+
+ private final void debug(String str) {
+ System.out.println(this + ": " + str);
+ }
+
+ private final void debug(String str, Throwable e) {
+ System.out.println(this + ": " + str);
+ e.printStackTrace();
+ }
+
+}
Propchange: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatcherThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java?rev=890493&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java Mon Dec 14 21:48:44 2009
@@ -0,0 +1,217 @@
+/**************************************************************************************
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved. *
+ * http://fusesource.com *
+ * ---------------------------------------------------------------------------------- *
+ * The software in this package is published under the terms of the AGPL license *
+ * a copy of which has been included with this distribution in the license.txt file. *
+ **************************************************************************************/
+package org.apache.activemq.dispatch.internal.nio;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+
+/**
+ * SelectableDispatchContext
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+class SelectableDispatchContext extends DispatchContext {
+ public static final boolean DEBUG = false;
+ private SelectableChannel channel;
+ private SelectionKey key;
+ private int updateInterests = -1;
+ private int readyOps = 0;
+
+ SelectableDispatchContext(DispatcherThread thread, Runnable runnable, String name) {
+ super(thread, runnable, true, name);
+
+ }
+
+
+ /**
+ * This can be called to set a channel on which the Dispatcher will
+ * perform selection operations. The channel may be changed over time.
+ *
+ * This method may only be called from the provided {@link Dispatchable}
+ * dispatch method, and is not thread safe.
+ *
+ * @param channel The channel on which to select.
+ * @throws ClosedChannelException If a closed chanel is provided
+ */
+ public void setChannel(SelectableChannel channel) throws ClosedChannelException {
+ if (this.channel != channel) {
+ if (isClosed()) {
+ return;
+ }
+ int interests = 0;
+ if (key != null) {
+ interests = key.interestOps();
+ key.cancel();
+ key = null;
+ }
+ this.channel = channel;
+ if (channel != null) {
+ updateInterestOps(interests);
+ }
+ }
+ }
+
+ /**
+ * May be overriden by subclass to additional work on dispatcher switch
+ *
+ * @param oldDispatcher
+ * The old dispatcher
+ * @param newDispatcher
+ * The new Dispatcher
+ */
+ protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
+ if (DEBUG) {
+ if (oldDispatcher == newDispatcher) {
+ System.out.println(this + " switching to same dispatcher " + newDispatcher + Thread.currentThread());
+ }
+ }
+ if (channel != null) {
+ if (DEBUG)
+ System.out.println(this + "Canceling key on dispatcher switch: " + oldDispatcher + newDispatcher);
+
+ SelectionKey exising = channel.keyFor(((NIODispatcherThread)oldDispatcher).getSelector());
+ if (exising != null) {
+ updateInterests = exising.interestOps();
+ exising.cancel();
+ }
+ }
+ }
+
+ public void processForeignUpdates() {
+ synchronized (this) {
+ if (channel != null) {
+ if (updateInterests > 0) {
+ if (DEBUG)
+ debug(this + "processing foreign update interests: " + updateInterests);
+ updateInterestOps(updateInterests);
+ updateInterests = -1;
+ }
+ }
+ }
+ super.processForeignUpdates();
+ }
+
+ /**
+ * This call updates the interest ops on which the dispatcher should select.
+ * When an interest becomes ready, the dispatcher will call the {@link Dispatchable}'s
+ * dispatch() method. At that time, {@link #readyOps()} can be called to see what
+ * interests are now ready.
+ *
+ * This method may only be called from {@link Dispatchable#dispatch()} and is not
+ * threadsafe. If the {@link Dispatchable} wishes to change it's interest op it
+ * must call {@link #requestDispatch()} so that they can be changed from the dispatch()
+ * method.
+ *
+ * @param interestOps The interest ops.
+ */
+ public void updateInterestOps(int ops) {
+ readyOps &= ~ops;
+ if (key != null && key.isValid()) {
+ key.interestOps(key.interestOps() | ops);
+ } else {
+ if (isClosed()) {
+ return;
+ }
+
+ // Make sure that we don't already have an invalidated key for
+ // this selector. If we do then do a select to get rid of the
+ // key:
+ SelectionKey existing = channel.keyFor( ((NIODispatcherThread)target).getSelector());
+ if (existing != null && !existing.isValid()) {
+ if (DEBUG)
+ debug(this + " registering existing invalid key:" + target + Thread.currentThread());
+ try {
+ ((NIODispatcherThread)target).getSelector().selectNow();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ if (DEBUG)
+ System.out.println(this + " registering new key with interests: " + ops);
+ try {
+ key = channel.register( ((NIODispatcherThread)target).getSelector(), ops, this);
+ } catch (ClosedChannelException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * This call retrieves the operations that have become ready since the last call
+ * to {@link #readyOps()}. Calling this method clears the ready ops.
+ *
+ * This method may only be called from {@link Dispatchable#dispatch()} and is not
+ * threadsafe.
+ *
+ * @return the readyOps.
+ */
+ public int readyOps() {
+ return readyOps;
+ /*
+ * if (key == null || !key.isValid()) { return 0; } else { return
+ * key.readyOps(); }
+ */
+ }
+
+ public boolean onSelect() {
+ readyOps = key.readyOps();
+ key.interestOps(key.interestOps() & ~key.readyOps());
+ synchronized (this) {
+ if (!isLinked()) {
+ target.execute(runnable, listPrio);
+ }
+ }
+
+ // System.out.println(this + "onSelect " + key.readyOps() + "/" +
+ return true;
+ }
+
+ public void close(boolean sync) {
+ // actual close can only happen on the owning dispatch thread:
+ if (target == DispatcherThread.CURRENT.get()) {
+
+ if (key != null && key.isValid()) {
+ // This will make sure that the key is removed
+ // from the selector.
+ key.cancel();
+ try {
+ ((NIODispatcherThread)target).getSelector().selectNow();
+ } catch (IOException e) {
+ if (DEBUG) {
+ debug("Error in close", e);
+ }
+ }
+ }
+ }
+ super.close(sync);
+ }
+
+ protected void debug(String str) {
+ System.out.println(str);
+ }
+
+ protected void debug(String str, Throwable thrown) {
+ if (str != null) {
+ System.out.println(str);
+ }
+ if (thrown != null) {
+ thrown.printStackTrace();
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/SelectableDispatchContext.java
------------------------------------------------------------------------------
svn:eol-style = native