You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/02/13 18:58:58 UTC

svn commit: r1445775 - in /qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/updater/ test/java/org/apache/qpid/server/configuration/updater/

Author: orudyy
Date: Wed Feb 13 17:58:58 2013
New Revision: 1445775

URL: http://svn.apache.org/r1445775
Log:
QPID-4390: Add initial implementation of task executor

Added:
    qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/
    qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
    qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/
    qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java

Added: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1445775&view=auto
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (added)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Wed Feb 13 17:58:58 2013
@@ -0,0 +1,324 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration.updater;
+
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.security.SecurityManager;
+
+public class TaskExecutor
+{
+    private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Configuration-Thread";
+    private static final Logger LOGGER = Logger.getLogger(TaskExecutor.class);
+
+    private volatile Thread _taskThread;
+    private final AtomicReference<State> _state;
+    private volatile ExecutorService _executor;
+
+    public TaskExecutor()
+    {
+        _state = new AtomicReference<State>(State.INITIALISING);
+    }
+
+    public State getState()
+    {
+        return _state.get();
+    }
+
+    public void start()
+    {
+        if (_state.compareAndSet(State.INITIALISING, State.ACTIVE))
+        {
+            LOGGER.debug("Starting task executor");
+            _executor = Executors.newFixedThreadPool(1, new ThreadFactory()
+            {
+                @Override
+                public Thread newThread(Runnable r)
+                {
+                    _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+                    return _taskThread;
+                }
+            });
+            LOGGER.debug("Task executor is started");
+        }
+    }
+
+    public void stopImmediately()
+    {
+        if (_state.compareAndSet(State.ACTIVE, State.STOPPED))
+        {
+            ExecutorService executor = _executor;
+            if (executor != null)
+            {
+                LOGGER.debug("Stopping task executor immediately");
+                List<Runnable> cancelledTasks = executor.shutdownNow();
+                if (cancelledTasks != null)
+                {
+                    for (Runnable runnable : cancelledTasks)
+                    {
+                        if (runnable instanceof RunnableFuture<?>)
+                        {
+                            ((RunnableFuture<?>) runnable).cancel(true);
+                        }
+                    }
+                }
+                _executor = null;
+                _taskThread = null;
+                LOGGER.debug("Task executor was stopped immediately. Number of unfinished tasks: " + cancelledTasks.size());
+            }
+        }
+    }
+
+    public void stop()
+    {
+        if (_state.compareAndSet(State.ACTIVE, State.STOPPED))
+        {
+            ExecutorService executor = _executor;
+            if (executor != null)
+            {
+                LOGGER.debug("Stopping task executor");
+                executor.shutdown();
+                _executor = null;
+                _taskThread = null;
+                LOGGER.debug("Task executor is stopped");
+            }
+        }
+    }
+
+    Future<?> submit(Callable<?> task)
+    {
+        checkState();
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Submitting task: " + task);
+        }
+        Future<?> future = null;
+        if (isTaskExecutorThread())
+        {
+            Object result = executeTaskAndHandleExceptions(task);
+            return new ImmediateFuture(result);
+        }
+        else
+        {
+            future = _executor.submit(new CallableWrapper(task));
+        }
+        return future;
+    }
+
+    public Object submitAndWait(Callable<?> task) throws CancellationException
+    {
+        try
+        {
+            Future<?> future = submit(task);
+            return future.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Task execution was interrupted: " + task, e);
+        }
+        catch (ExecutionException e)
+        {
+            Throwable cause = e.getCause();
+            if (cause instanceof RuntimeException)
+            {
+                throw (RuntimeException) cause;
+            }
+            else if (cause instanceof Exception)
+            {
+                throw new RuntimeException("Failed to execute user task: " + task, cause);
+            }
+            else if (cause instanceof Error)
+            {
+                throw (Error) cause;
+            }
+            else
+            {
+                throw new RuntimeException("Failed to execute user task: " + task, cause);
+            }
+        }
+    }
+
+    public boolean isTaskExecutorThread()
+    {
+        return Thread.currentThread() == _taskThread;
+    }
+
+    private void checkState()
+    {
+        if (_state.get() != State.ACTIVE)
+        {
+            throw new IllegalStateException("Task executor is not in ACTIVE state");
+        }
+    }
+
+    private Object executeTaskAndHandleExceptions(Callable<?> userTask)
+    {
+        try
+        {
+            return executeTask(userTask);
+        }
+        catch (Exception e)
+        {
+            if (e instanceof RuntimeException)
+            {
+                throw (RuntimeException) e;
+            }
+            throw new RuntimeException("Failed to execute user task: " + userTask, e);
+        }
+    }
+
+    private Object executeTask(Callable<?> userTask) throws Exception
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Performing task " + userTask);
+        }
+        Object result = userTask.call();
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Task " + userTask + " is performed successfully with result:" + result);
+        }
+        return result;
+    }
+
+    private class CallableWrapper implements Callable<Object>
+    {
+        private Callable<?> _userTask;
+        private Subject _securityManagerSubject;
+        private LogActor _actor;
+        private Subject _contextSubject;
+
+        public CallableWrapper(Callable<?> userWork)
+        {
+            _userTask = userWork;
+            _securityManagerSubject = SecurityManager.getThreadSubject();
+            _actor = CurrentActor.get();
+            _contextSubject = Subject.getSubject(AccessController.getContext());
+        }
+
+        @Override
+        public Object call() throws Exception
+        {
+            SecurityManager.setThreadSubject(_securityManagerSubject);
+            CurrentActor.set(_actor);
+
+            try
+            {
+                Object result = null;
+                try
+                {
+                    result = Subject.doAs(_contextSubject, new PrivilegedExceptionAction<Object>()
+                    {
+                        @Override
+                        public Object run() throws Exception
+                        {
+                            return executeTask(_userTask);
+                        }
+                    });
+                }
+                catch (PrivilegedActionException e)
+                {
+                    throw e.getException();
+                }
+                return result;
+            }
+            finally
+            {
+                try
+                {
+                    CurrentActor.remove();
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Unxpected exception on current actor removal", e);
+                }
+                try
+                {
+                    SecurityManager.setThreadSubject(null);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Unxpected exception on nullifying of subject for a security manager", e);
+                }
+            }
+        }
+    }
+
+    private class ImmediateFuture implements Future<Object>
+    {
+        private Object _result;
+
+        public ImmediateFuture(Object result)
+        {
+            super();
+            this._result = result;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return true;
+        }
+
+        @Override
+        public Object get()
+        {
+            return _result;
+        }
+
+        @Override
+        public Object get(long timeout, TimeUnit unit)
+        {
+            return get();
+        }
+    }
+}

Added: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java?rev=1445775&view=auto
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java (added)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java Wed Feb 13 17:58:58 2013
@@ -0,0 +1,296 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration.updater;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.security.SecurityManager;
+
+public class TaskExecutorTest extends TestCase
+{
+    private TaskExecutor _executor;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _executor = new TaskExecutor();
+    }
+
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            _executor.stopImmediately();
+        }
+        finally
+        {
+            super.tearDown();
+        }
+    }
+
+    public void testGetState()
+    {
+        assertEquals("Unxpected initial state", State.INITIALISING, _executor.getState());
+    }
+
+    public void testStart()
+    {
+        _executor.start();
+        assertEquals("Unxpected started state", State.ACTIVE, _executor.getState());
+    }
+
+    public void testStopImmediately() throws Exception
+    {
+        _executor.start();
+        final CountDownLatch submitLatch = new CountDownLatch(2);
+        final CountDownLatch waitForCallLatch = new CountDownLatch(1);
+        final BlockingQueue<Exception> submitExceptions = new LinkedBlockingQueue<Exception>();
+
+        Runnable runnable = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    Future<?> f = _executor.submit(new NeverEndingCallable(waitForCallLatch));
+                    submitLatch.countDown();
+                    f.get();
+                }
+                catch (Exception e)
+                {
+                    if (e instanceof ExecutionException)
+                    {
+                        e = (Exception) e.getCause();
+                    }
+                    submitExceptions.add(e);
+                }
+            }
+        };
+        new Thread(runnable).start();
+        new Thread(runnable).start();
+        assertTrue("Tasks have not been submitted", submitLatch.await(1000, TimeUnit.MILLISECONDS));
+        assertTrue("The first task has not been triggered", waitForCallLatch.await(1000, TimeUnit.MILLISECONDS));
+
+        _executor.stopImmediately();
+        assertEquals("Unxpected stopped state", State.STOPPED, _executor.getState());
+
+        Exception e = submitExceptions.poll(1000l, TimeUnit.MILLISECONDS);
+        assertNotNull("The task execution was not interrupted or cancelled", e);
+        Exception e2 = submitExceptions.poll(1000l, TimeUnit.MILLISECONDS);
+        assertNotNull("The task execution was not interrupted or cancelled", e2);
+
+        assertTrue("One of the exceptions should be CancellationException:", e2 instanceof CancellationException
+                || e instanceof CancellationException);
+        assertTrue("One of the exceptions should be InterruptedException:", e2 instanceof InterruptedException
+                || e instanceof InterruptedException);
+    }
+
+    public void testStop()
+    {
+        _executor.start();
+        _executor.stop();
+        assertEquals("Unxpected stopped state", State.STOPPED, _executor.getState());
+    }
+
+    public void testSubmitAndWait() throws Exception
+    {
+        _executor.start();
+        Object result = _executor.submitAndWait(new Callable<String>()
+        {
+            @Override
+            public String call() throws Exception
+            {
+                return "DONE";
+            }
+        });
+        assertEquals("Unexpected task execution result", "DONE", result);
+    }
+
+    public void testSubmitAndWaitInNotAuthorizedContext()
+    {
+        _executor.start();
+        Object subject = _executor.submitAndWait(new SubjectRetriever());
+        assertNull("Subject must be null", subject);
+    }
+
+    public void testSubmitAndWaitInAuthorizedContext()
+    {
+        _executor.start();
+        Subject subject = new Subject();
+        Object result = Subject.doAs(subject, new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                return _executor.submitAndWait(new SubjectRetriever());
+            }
+        });
+        assertEquals("Unexpected subject", subject, result);
+    }
+
+    public void testSubmitAndWaitInAuthorizedContextWithNullSubject()
+    {
+        _executor.start();
+        Object result = Subject.doAs(null, new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                return _executor.submitAndWait(new SubjectRetriever());
+            }
+        });
+        assertEquals("Unexpected subject", null, result);
+    }
+
+    public void testSubmitAndWaitReThrowsOriginalRuntimeException()
+    {
+        final RuntimeException exception = new RuntimeException();
+        _executor.start();
+        try
+        {
+            _executor.submitAndWait(new Callable<Void>()
+            {
+
+                @Override
+                public Void call() throws Exception
+                {
+                    throw exception;
+                }
+            });
+            fail("Exception is expected");
+        }
+        catch (Exception e)
+        {
+            assertEquals("Unexpected exception", exception, e);
+        }
+    }
+
+    public void testSubmitAndWaitPassesOriginalCheckedException()
+    {
+        final Exception exception = new Exception();
+        _executor.start();
+        try
+        {
+            _executor.submitAndWait(new Callable<Void>()
+            {
+
+                @Override
+                public Void call() throws Exception
+                {
+                    throw exception;
+                }
+            });
+            fail("Exception is expected");
+        }
+        catch (Exception e)
+        {
+            assertEquals("Unexpected exception", exception, e.getCause());
+        }
+    }
+
+    public void testSubmitAndWaitCurrentActorAndSecurityManagerSubjectAreRespected() throws Exception
+    {
+        _executor.start();
+        LogActor actor = new TestLogActor(new NullRootMessageLogger());
+        Subject subject = new Subject();
+        Subject currentSecurityManagerSubject = SecurityManager.getThreadSubject();
+        final AtomicReference<LogActor> taskLogActor = new AtomicReference<LogActor>();
+        final AtomicReference<Subject> taskSubject = new AtomicReference<Subject>();
+        try
+        {
+            CurrentActor.set(actor);
+            SecurityManager.setThreadSubject(subject);
+            _executor.submitAndWait(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    taskLogActor.set(CurrentActor.get());
+                    taskSubject.set(SecurityManager.getThreadSubject());
+                    return null;
+                }
+            });
+        }
+        finally
+        {
+            SecurityManager.setThreadSubject(currentSecurityManagerSubject);
+            CurrentActor.remove();
+        }
+        assertEquals("Unexpected task log actor", actor, taskLogActor.get());
+        assertEquals("Unexpected security manager subject", subject, taskSubject.get());
+    }
+
+    private class SubjectRetriever implements Callable<Subject>
+    {
+        @Override
+        public Subject call() throws Exception
+        {
+            return Subject.getSubject(AccessController.getContext());
+        }
+    }
+
+    private class NeverEndingCallable implements Callable<Void>
+    {
+        private CountDownLatch _waitLatch;
+
+        public NeverEndingCallable(CountDownLatch waitLatch)
+        {
+            super();
+            _waitLatch = waitLatch;
+        }
+
+        @Override
+        public Void call() throws Exception
+        {
+            if (_waitLatch != null)
+            {
+                _waitLatch.countDown();
+            }
+
+            // wait forever
+            synchronized (this)
+            {
+                this.wait();
+            }
+            return null;
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org