You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ad...@apache.org on 2011/07/31 19:27:59 UTC

svn commit: r1152599 - in /mina/sandbox/adc/ahc/mina3/src: main/java/org/apache/mina/core/AbstractIoFuture.java test/java/org/apache/mina/ test/java/org/apache/mina/core/ test/java/org/apache/mina/core/AbstractIoFutureTest.java

Author: adc
Date: Sun Jul 31 17:27:58 2011
New Revision: 1152599

URL: http://svn.apache.org/viewvc?rev=1152599&view=rev
Log:
Ported AbstractIoFuture back to include in my ongoing scratchpad

Added:
    mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java
    mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/
    mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/
    mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java

Added: mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java?rev=1152599&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java (added)
+++ mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java Sun Jul 31 17:27:58 2011
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An abstract implementation of {@link IoFuture}.  Owners of this future
+ * must implement {@link #cancelOwner(boolean)} to receive notifications of
+ * when the future should be canceled.
+ * <p/>
+ * Concrete implementations of this abstract class should consider overriding
+ * the two methods {@link #scheduleResult(IoFutureListener, Object)}
+ * and {@link #scheduleException(IoFutureListener, Throwable)}
+ * so that listeners are called in a separate thread.  The default
+ * implementations may end up calling the listener in the same thread that is
+ * registering the listener, before the registration has completed.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public abstract class AbstractIoFuture<V> implements IoFuture<V> {
+
+    static final Logger LOG = LoggerFactory.getLogger(AbstractIoFuture.class);
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private final List<IoFutureListener<V>> listeners = new ArrayList<IoFutureListener<V>>();
+    private final AtomicReference<Object> result = new AtomicReference<Object>();
+
+    /**
+     * {@inheritDoc}
+     */
+    @SuppressWarnings({"unchecked"})
+    public IoFuture<V> register(IoFutureListener<V> listener) {
+
+        LOG.debug("registering listener {}", listener);
+
+        synchronized (latch) {
+            if (!isDone()) {
+                LOG.debug("future is not done, adding listener to listener set");
+                listeners.add(listener);
+                listener = null;
+            }
+        }
+
+        if (listener != null) {
+            LOG.debug("future is done calling listener");
+            Object object = result.get();
+            if (object instanceof Throwable) {
+                scheduleException(listener, (Throwable) object);
+            } else {
+                scheduleResult(listener, (V) object);
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean cancel(boolean mayInterruptIfRunning) {
+
+        LOG.debug("Attempting to cancel");
+
+        CancellationException ce = null;
+        synchronized (latch) {
+            if (!isCancelled() && !isDone() && cancelOwner(mayInterruptIfRunning)) {
+
+                LOG.debug("Successfully cancelled");
+
+                ce = new CancellationException();
+                result.set(ce);
+            } else {
+                LOG.debug("Unable to cancel");
+            }
+            latch.countDown();
+        }
+
+        if (ce != null) {
+            LOG.debug("Calling listeners");
+
+            for (IoFutureListener<V> listener : listeners) {
+                scheduleException(listener, ce);
+            }
+        }
+
+        return ce != null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isCancelled() {
+        return result.get() instanceof CancellationException;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isDone() {
+        return latch.getCount() == 0;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @SuppressWarnings({"unchecked"})
+    public V get() throws InterruptedException, ExecutionException {
+
+        LOG.trace("Entering wait");
+        latch.await();
+        LOG.trace("Wait completed");
+
+        if (isCancelled()) throw new CancellationException();
+
+        Object object = result.get();
+        if (object instanceof ExecutionException) {
+            throw (ExecutionException) object;
+        } else {
+            return (V) object;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @SuppressWarnings({"unchecked"})
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+        LOG.trace("Entering wait");
+        if (!latch.await(timeout, unit)) throw new TimeoutException();
+        LOG.trace("Wait completed");
+
+        if (isCancelled()) throw new CancellationException();
+
+        Object object = result.get();
+        if (object instanceof ExecutionException) {
+            throw (ExecutionException) object;
+        } else {
+            return (V) object;
+        }
+    }
+
+    /**
+     * Notify the owner of this future that a client is attempting to cancel.
+     * This attempt will fail if the task has already completed, has already
+     * been cancelled, or could not be cancelled for some other reason. If
+     * successful, and this task has not started when <tt>cancel</tt> is called,
+     * this task should never run.  If the task has already started,
+     * then the <tt>mayInterruptIfRunning</tt> parameter determines
+     * whether the thread executing this task should be interrupted in
+     * an attempt to stop the task.
+     * <p/>
+     * <p>After this method returns, subsequent calls to {@link #isDone} will
+     * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
+     * will always return <tt>true</tt> if this method returned <tt>true</tt>.
+     * <p/>
+     * <b>Note:</b> implementations must never throw an exception.
+     *
+     * @param mayInterruptIfRunning <tt>true</tt> if the owner executing this
+     *                              task should be interrupted; otherwise,
+     *                              in-progress tasks are allowed to complete
+     * @return <tt>false</tt> if the task could not be cancelled,
+     *         typically because it has already completed normally;
+     *         <tt>true</tt> otherwise
+     */
+    abstract protected boolean cancelOwner(boolean mayInterruptIfRunning);
+
+    /**
+     * Default implementation to call a listener's {@link IoFutureListener#completed(Object)}
+     * method.  Owners may override this method so that the listener is called
+     * from a thread pool.
+     *
+     * @param listener the listener to call
+     * @param result   the result to pass to the listener
+     */
+    protected void scheduleResult(IoFutureListener<V> listener, V result) {
+        LOG.debug("Calling the default result scheduler");
+        try {
+            listener.completed(result);
+        } catch (Throwable t) {
+            LOG.warn("Listener threw an exception", t);
+        }
+    }
+
+    /**
+     * Default implementation to call a listener's {@link IoFutureListener#exception(Throwable)}
+     * method.  Owners may override this method so that the listener is called
+     * from a thread pool.
+     *
+     * @param listener  the listener to call
+     * @param throwable the exception to pass to the listener
+     */
+    protected void scheduleException(IoFutureListener<V> listener, Throwable throwable) {
+        LOG.debug("Calling the default exception scheduler");
+        try {
+            listener.exception(throwable);
+        } catch (Throwable t) {
+            LOG.warn("Listener threw an exception", t);
+        }
+    }
+
+
+    /**
+     * Set the future result of the executing task.  Any {@link IoFutureListener}s
+     * are notified of the
+     *
+     * @param value the value returned by the executing task.
+     */
+    protected final void setResult(V value) {
+        assert !isDone();
+
+        synchronized (latch) {
+            result.set(value);
+            latch.countDown();
+        }
+
+        for (IoFutureListener<V> listener : listeners) {
+            scheduleResult(listener, value);
+        }
+
+        listeners.clear();
+    }
+
+    /**
+     * Set the future result as a {@link Throwable}, indicating that a
+     * throwable was thrown while executing the task.  This value is usually
+     * set by the future result owner.
+     * <p/>
+     * Any {@link IoFutureListener}s are notified of the exception.
+     *
+     * @param t the throwable that was thrown while executing the task.
+     */
+    protected final void setException(Throwable t) {
+        assert !isDone();
+
+        ExecutionException ee = new ExecutionException(t);
+
+        synchronized (latch) {
+            result.set(ee);
+            latch.countDown();
+        }
+
+        for (IoFutureListener<V> listener : listeners) {
+            scheduleException(listener, ee);
+        }
+
+        listeners.clear();
+    }
+}

Added: mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java
URL: http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java?rev=1152599&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java (added)
+++ mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java Sun Jul 31 17:27:58 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.core;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class AbstractIoFutureTest {
+
+    @Test
+    public void testSet() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setResult(true);
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    public void testSetListeners() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        future.register(listener);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setResult(true);
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).completed(true);
+        verify(listener, never()).exception(Matchers.<Throwable>any());
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    public void testSetListenersAlreadySet() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setResult(true);
+        future.register(listener);
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).completed(true);
+        verify(listener, never()).exception(Matchers.<Throwable>any());
+    }
+
+    @Test
+    public void testTimedGet() {
+        final MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                    future.setResult(true);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }).start();
+
+        try {
+            assertTrue(future.get(1, TimeUnit.DAYS));
+            assertFalse(future.isCancelled());
+            assertTrue(future.isDone());
+        } catch (InterruptedException e) {
+            fail("This future was not interrupted");
+        } catch (ExecutionException ee) {
+            fail("This future did not have an execution exception");
+        } catch (TimeoutException e) {
+            fail("This future was not interrupted");
+        }
+    }
+
+    @Test
+    public void testException() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setException(new NullPointerException());
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        try {
+            future.get();
+            fail("This future had an execution exception");
+        } catch (InterruptedException e) {
+            fail("This future was not interrupted");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof NullPointerException);
+        }
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+    public void testExceptionListeners() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        future.register(listener);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setException(new NullPointerException());
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).exception(argThat(matchesExecutionException()));
+        verify(listener, never()).completed(Matchers.<Boolean>any());
+    }
+
+    @Test
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+    public void testExceptionListenersExceptionAlreadySet() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setException(new NullPointerException());
+        future.register(listener);
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).exception(argThat(matchesExecutionException()));
+        verify(listener, never()).completed(Matchers.<Boolean>any());
+    }
+
+    @Test
+    public void testImmediateExceptionForTimedGet() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.setException(new NullPointerException());
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        try {
+            future.get(1, TimeUnit.DAYS);
+            fail("This future had an execution exception");
+        } catch (InterruptedException e) {
+            fail("This future was not interrupted");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof NullPointerException);
+        } catch (TimeoutException e) {
+            fail("This future was not interrupted");
+        }
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void testTimedExceptionForTimedGet() {
+        final MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                    future.setException(new NullPointerException());
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }).start();
+
+        try {
+            assertTrue(future.get(1, TimeUnit.DAYS));
+        } catch (InterruptedException e) {
+            fail("This future was not interrupted");
+        } catch (ExecutionException ee) {
+        } catch (TimeoutException e) {
+            fail("This future was not interrupted");
+        }
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        doReturn(true).when(future).cancelOwner(anyBoolean());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+        assertTrue(future.cancel(false));
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+        assertFalse(future.cancel(false));
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+
+        try {
+            future.get();
+            fail("This future was canceled");
+        } catch (CancellationException ignore) {
+        }
+    }
+
+    @Test
+    public void testCancelUncancelableOwner() throws Exception {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        doReturn(false).when(future).cancelOwner(anyBoolean());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+        assertFalse(future.cancel(false));
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void testCancelFinishedFuture() throws Exception {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        doReturn(true).when(future).cancelOwner(anyBoolean());
+
+        future.setResult(true);
+
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+        assertFalse(future.cancel(false));
+        assertFalse(future.isCancelled());
+        assertTrue(future.isDone());
+
+        assertTrue(future.get());
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    public void testCanceledListeners() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        doReturn(true).when(future).cancelOwner(anyBoolean());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        future.register(listener);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.cancel(true);
+
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).exception(Matchers.<CancellationException>any());
+        verify(listener, never()).completed(Matchers.<Boolean>any());
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    public void testCanceledListenersAlreadySet() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+        doReturn(true).when(future).cancelOwner(anyBoolean());
+        IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        future.cancel(true);
+        future.register(listener);
+
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+
+        verify(listener).exception(Matchers.<CancellationException>any());
+        verify(listener, never()).completed(Matchers.<Boolean>any());
+    }
+
+    @Test
+    public void testTimeout() {
+        MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+        assertFalse(future.isCancelled());
+        assertFalse(future.isDone());
+
+        try {
+            future.get(10, TimeUnit.MILLISECONDS);
+            fail("This future has timed out");
+        } catch (InterruptedException e) {
+            fail("This future was not interrupted");
+        } catch (ExecutionException ee) {
+            fail("This future did not have an execution exception");
+        } catch (TimeoutException e) {
+        }
+    }
+
+    private static Matcher<Throwable> matchesExecutionException() {
+        return new BaseMatcher<Throwable>() {
+            @Override
+            public boolean matches(Object item) {
+                return item instanceof ExecutionException && ((ExecutionException) item).getCause() instanceof NullPointerException;
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText("ExecutionException(NullPointerException)");
+            }
+        };
+    }
+
+    public static class MockAbstractIoFuture<V> extends AbstractIoFuture<V> {
+
+        @Override
+        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}