You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ad...@apache.org on 2011/07/31 19:27:59 UTC
svn commit: r1152599 - in /mina/sandbox/adc/ahc/mina3/src:
main/java/org/apache/mina/core/AbstractIoFuture.java
test/java/org/apache/mina/ test/java/org/apache/mina/core/
test/java/org/apache/mina/core/AbstractIoFutureTest.java
Author: adc
Date: Sun Jul 31 17:27:58 2011
New Revision: 1152599
URL: http://svn.apache.org/viewvc?rev=1152599&view=rev
Log:
Ported AbstractIoFuture back to include in my ongoing scratchpad
Added:
mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/
mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java
Added: mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java?rev=1152599&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java (added)
+++ mina/sandbox/adc/ahc/mina3/src/main/java/org/apache/mina/core/AbstractIoFuture.java Sun Jul 31 17:27:58 2011
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An abstract implementation of {@link IoFuture}. Owners of this future
+ * must implement {@link #cancelOwner(boolean)} to receive notifications of
+ * when the future should be canceled.
+ * <p/>
+ * Concrete implementations of this abstract class should consider overriding
+ * the two methods {@link #scheduleResult(IoFutureListener, Object)}
+ * and {@link #scheduleException(IoFutureListener, Throwable)}
+ * so that listeners are called in a separate thread. The default
+ * implementations may end up calling the listener in the same thread that is
+ * registering the listener, before the registration has completed.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public abstract class AbstractIoFuture<V> implements IoFuture<V> {
+
+ static final Logger LOG = LoggerFactory.getLogger(AbstractIoFuture.class);
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final List<IoFutureListener<V>> listeners = new ArrayList<IoFutureListener<V>>();
+ private final AtomicReference<Object> result = new AtomicReference<Object>();
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings({"unchecked"})
+ public IoFuture<V> register(IoFutureListener<V> listener) {
+
+ LOG.debug("registering listener {}", listener);
+
+ synchronized (latch) {
+ if (!isDone()) {
+ LOG.debug("future is not done, adding listener to listener set");
+ listeners.add(listener);
+ listener = null;
+ }
+ }
+
+ if (listener != null) {
+ LOG.debug("future is done calling listener");
+ Object object = result.get();
+ if (object instanceof Throwable) {
+ scheduleException(listener, (Throwable) object);
+ } else {
+ scheduleResult(listener, (V) object);
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean cancel(boolean mayInterruptIfRunning) {
+
+ LOG.debug("Attempting to cancel");
+
+ CancellationException ce = null;
+ synchronized (latch) {
+ if (!isCancelled() && !isDone() && cancelOwner(mayInterruptIfRunning)) {
+
+ LOG.debug("Successfully cancelled");
+
+ ce = new CancellationException();
+ result.set(ce);
+ } else {
+ LOG.debug("Unable to cancel");
+ }
+ latch.countDown();
+ }
+
+ if (ce != null) {
+ LOG.debug("Calling listeners");
+
+ for (IoFutureListener<V> listener : listeners) {
+ scheduleException(listener, ce);
+ }
+ }
+
+ return ce != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isCancelled() {
+ return result.get() instanceof CancellationException;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isDone() {
+ return latch.getCount() == 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings({"unchecked"})
+ public V get() throws InterruptedException, ExecutionException {
+
+ LOG.trace("Entering wait");
+ latch.await();
+ LOG.trace("Wait completed");
+
+ if (isCancelled()) throw new CancellationException();
+
+ Object object = result.get();
+ if (object instanceof ExecutionException) {
+ throw (ExecutionException) object;
+ } else {
+ return (V) object;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings({"unchecked"})
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+ LOG.trace("Entering wait");
+ if (!latch.await(timeout, unit)) throw new TimeoutException();
+ LOG.trace("Wait completed");
+
+ if (isCancelled()) throw new CancellationException();
+
+ Object object = result.get();
+ if (object instanceof ExecutionException) {
+ throw (ExecutionException) object;
+ } else {
+ return (V) object;
+ }
+ }
+
+ /**
+ * Notify the owner of this future that a client is attempting to cancel.
+ * This attempt will fail if the task has already completed, has already
+ * been cancelled, or could not be cancelled for some other reason. If
+ * successful, and this task has not started when <tt>cancel</tt> is called,
+ * this task should never run. If the task has already started,
+ * then the <tt>mayInterruptIfRunning</tt> parameter determines
+ * whether the thread executing this task should be interrupted in
+ * an attempt to stop the task.
+ * <p/>
+ * <p>After this method returns, subsequent calls to {@link #isDone} will
+ * always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
+ * will always return <tt>true</tt> if this method returned <tt>true</tt>.
+ * <p/>
+ * <b>Note:</b> implementations must never throw an exception.
+ *
+ * @param mayInterruptIfRunning <tt>true</tt> if the owner executing this
+ * task should be interrupted; otherwise,
+ * in-progress tasks are allowed to complete
+ * @return <tt>false</tt> if the task could not be cancelled,
+ * typically because it has already completed normally;
+ * <tt>true</tt> otherwise
+ */
+ abstract protected boolean cancelOwner(boolean mayInterruptIfRunning);
+
+ /**
+ * Default implementation to call a listener's {@link IoFutureListener#completed(Object)}
+ * method. Owners may override this method so that the listener is called
+ * from a thread pool.
+ *
+ * @param listener the listener to call
+ * @param result the result to pass to the listener
+ */
+ protected void scheduleResult(IoFutureListener<V> listener, V result) {
+ LOG.debug("Calling the default result scheduler");
+ try {
+ listener.completed(result);
+ } catch (Throwable t) {
+ LOG.warn("Listener threw an exception", t);
+ }
+ }
+
+ /**
+ * Default implementation to call a listener's {@link IoFutureListener#exception(Throwable)}
+ * method. Owners may override this method so that the listener is called
+ * from a thread pool.
+ *
+ * @param listener the listener to call
+ * @param throwable the exception to pass to the listener
+ */
+ protected void scheduleException(IoFutureListener<V> listener, Throwable throwable) {
+ LOG.debug("Calling the default exception scheduler");
+ try {
+ listener.exception(throwable);
+ } catch (Throwable t) {
+ LOG.warn("Listener threw an exception", t);
+ }
+ }
+
+
+ /**
+ * Set the future result of the executing task. Any {@link IoFutureListener}s
+ * are notified of the
+ *
+ * @param value the value returned by the executing task.
+ */
+ protected final void setResult(V value) {
+ assert !isDone();
+
+ synchronized (latch) {
+ result.set(value);
+ latch.countDown();
+ }
+
+ for (IoFutureListener<V> listener : listeners) {
+ scheduleResult(listener, value);
+ }
+
+ listeners.clear();
+ }
+
+ /**
+ * Set the future result as a {@link Throwable}, indicating that a
+ * throwable was thrown while executing the task. This value is usually
+ * set by the future result owner.
+ * <p/>
+ * Any {@link IoFutureListener}s are notified of the exception.
+ *
+ * @param t the throwable that was thrown while executing the task.
+ */
+ protected final void setException(Throwable t) {
+ assert !isDone();
+
+ ExecutionException ee = new ExecutionException(t);
+
+ synchronized (latch) {
+ result.set(ee);
+ latch.countDown();
+ }
+
+ for (IoFutureListener<V> listener : listeners) {
+ scheduleException(listener, ee);
+ }
+
+ listeners.clear();
+ }
+}
Added: mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java
URL: http://svn.apache.org/viewvc/mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java?rev=1152599&view=auto
==============================================================================
--- mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java (added)
+++ mina/sandbox/adc/ahc/mina3/src/test/java/org/apache/mina/core/AbstractIoFutureTest.java Sun Jul 31 17:27:58 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.core;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class AbstractIoFutureTest {
+
+ @Test
+ public void testSet() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setResult(true);
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ public void testSetListeners() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ future.register(listener);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setResult(true);
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).completed(true);
+ verify(listener, never()).exception(Matchers.<Throwable>any());
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ public void testSetListenersAlreadySet() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setResult(true);
+ future.register(listener);
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).completed(true);
+ verify(listener, never()).exception(Matchers.<Throwable>any());
+ }
+
+ @Test
+ public void testTimedGet() {
+ final MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ future.setResult(true);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }).start();
+
+ try {
+ assertTrue(future.get(1, TimeUnit.DAYS));
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ } catch (InterruptedException e) {
+ fail("This future was not interrupted");
+ } catch (ExecutionException ee) {
+ fail("This future did not have an execution exception");
+ } catch (TimeoutException e) {
+ fail("This future was not interrupted");
+ }
+ }
+
+ @Test
+ public void testException() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setException(new NullPointerException());
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ try {
+ future.get();
+ fail("This future had an execution exception");
+ } catch (InterruptedException e) {
+ fail("This future was not interrupted");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof NullPointerException);
+ }
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+ public void testExceptionListeners() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ future.register(listener);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setException(new NullPointerException());
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).exception(argThat(matchesExecutionException()));
+ verify(listener, never()).completed(Matchers.<Boolean>any());
+ }
+
+ @Test
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+ public void testExceptionListenersExceptionAlreadySet() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setException(new NullPointerException());
+ future.register(listener);
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).exception(argThat(matchesExecutionException()));
+ verify(listener, never()).completed(Matchers.<Boolean>any());
+ }
+
+ @Test
+ public void testImmediateExceptionForTimedGet() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.setException(new NullPointerException());
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ try {
+ future.get(1, TimeUnit.DAYS);
+ fail("This future had an execution exception");
+ } catch (InterruptedException e) {
+ fail("This future was not interrupted");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof NullPointerException);
+ } catch (TimeoutException e) {
+ fail("This future was not interrupted");
+ }
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testTimedExceptionForTimedGet() {
+ final MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ future.setException(new NullPointerException());
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }).start();
+
+ try {
+ assertTrue(future.get(1, TimeUnit.DAYS));
+ } catch (InterruptedException e) {
+ fail("This future was not interrupted");
+ } catch (ExecutionException ee) {
+ } catch (TimeoutException e) {
+ fail("This future was not interrupted");
+ }
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ doReturn(true).when(future).cancelOwner(anyBoolean());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+ assertTrue(future.cancel(false));
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+ assertFalse(future.cancel(false));
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ try {
+ future.get();
+ fail("This future was canceled");
+ } catch (CancellationException ignore) {
+ }
+ }
+
+ @Test
+ public void testCancelUncancelableOwner() throws Exception {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ doReturn(false).when(future).cancelOwner(anyBoolean());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+ assertFalse(future.cancel(false));
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testCancelFinishedFuture() throws Exception {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ doReturn(true).when(future).cancelOwner(anyBoolean());
+
+ future.setResult(true);
+
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+ assertFalse(future.cancel(false));
+ assertFalse(future.isCancelled());
+ assertTrue(future.isDone());
+
+ assertTrue(future.get());
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ public void testCanceledListeners() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ doReturn(true).when(future).cancelOwner(anyBoolean());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ future.register(listener);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.cancel(true);
+
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).exception(Matchers.<CancellationException>any());
+ verify(listener, never()).completed(Matchers.<Boolean>any());
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ public void testCanceledListenersAlreadySet() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+ doReturn(true).when(future).cancelOwner(anyBoolean());
+ IoFutureListener<Boolean> listener = mock(IoFutureListener.class);
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ future.cancel(true);
+ future.register(listener);
+
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ verify(listener).exception(Matchers.<CancellationException>any());
+ verify(listener, never()).completed(Matchers.<Boolean>any());
+ }
+
+ @Test
+ public void testTimeout() {
+ MockAbstractIoFuture<Boolean> future = spy(new MockAbstractIoFuture<Boolean>());
+
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+
+ try {
+ future.get(10, TimeUnit.MILLISECONDS);
+ fail("This future has timed out");
+ } catch (InterruptedException e) {
+ fail("This future was not interrupted");
+ } catch (ExecutionException ee) {
+ fail("This future did not have an execution exception");
+ } catch (TimeoutException e) {
+ }
+ }
+
+ private static Matcher<Throwable> matchesExecutionException() {
+ return new BaseMatcher<Throwable>() {
+ @Override
+ public boolean matches(Object item) {
+ return item instanceof ExecutionException && ((ExecutionException) item).getCause() instanceof NullPointerException;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("ExecutionException(NullPointerException)");
+ }
+ };
+ }
+
+ public static class MockAbstractIoFuture<V> extends AbstractIoFuture<V> {
+
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}