You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@river.apache.org by Peter <ji...@zeus.net.au> on 2014/01/07 03:52:33 UTC

Fixing ServiceDiscoveryManager WAS: Re: TaskManager as an ExecutorService

I recently updated RetryTask to implement RunnableFuture (it had very similar methods), it also no longer implements Task.runAfter which has been pushed down to subclasses that use it (aka those in ServiceDiscoveryManager) it wouldn't take much effort to add functionality to RetryTask to allow listeners like ListenableFuture has.

When the listeners are notified of success, they submit themselves to the provided executor, otherwise they could be notified every time the retry task fails and take some sort of action after giving up.

It probably isn't a good idea to include a Guava jar dependency, since Guava doesn't support serialization compatibility across releases.

Feel free to propose any alternative suggestions.

Please raise any objections to fixing ServiceDiscoveryManager in another thread.

Regards,

Peter.

----- Original message -----
> Gregg,
>
> Are you able to help out with ServiceDiscoveryManager?
>
> Fixing SDM would make a huge stability improvement.
>
> Guava has an interface called ListenableFuture, which sounds like a
> possible candidate, listeners are registered and notified in the event
> of failure or success.
>
> The Guava site claims it makes it possible to have complex chains of
> asynchronous operations.
>
> Regards,
>
> Peter.
>
> ----- Original message -----
> > I think the better choice is to not try and handle failure with retry
> > at all.      Instead, we should use API which allows the dependent task
> > to know whether it’s dependent has completed or failed.    It can then
> > report failure if its dependent failed, or submit its own work to be
> > executed in the queue.    
> >
> > This linking of application behavior into utility APIs is just not
> > good, testable design.    It requires that behaviors on the outside of
> > the pool, flow through to other uses of the API in non-programatic
> > design ways.
> >
> > Gregg Wonderly
> >        
> > On Jan 4, 2014, at 7:46 PM, Peter <ji...@zeus.net.au> wrote:
> >
> > > For a moment, lets consider how we might fix TaskManager, while
> > > retaining the existing Task.runAfter method and how the fix would
> > > impact TaskManager's users.
> > >
> > > TaskManager, like most thread pools has a queue.
> > >
> > > Before a task is removed from the queue for execution, it is asked if
> > > it should "runAfter" any other task present in the queue, while this
> > > occurs, the queue is locked.
> > >
> > > Each task is removed from the queue before it executes.
> > >
> > > Now what happens if a task fails to execute, or it's thread is
> > > suspended by the OS scheduler and is not in the queue during
> > > execution and another task/s that depend on it need to "runAfter"
> > > are allowed to execute before it completes?
> > >
> > > Because a task doesn't know which tasks depend upon it running first,
> > > the only way to fix TaskManager is to only remove a task after it
> > > completes successfully.
> > >
> > > Now TaskManager has no scheduling capability, which means that the
> > > fix will require the task to both remain in the queue and consume a
> > > pool thread until it successfully completes because once the Task.run
> > > method completes it is assumed to have completed successfully.
> > >
> > > RetryTask and WakupManager would also need to be rewritten to
> > > accommodate this requirement.
> > >
> > > The result will be correctly ordered execution of tasks, with
> > > increased thread & memory consumption as well as significantly
> > > reduced throughput.
> > >
> > > The reduced throughput could also help mask other issues by reducing
> > > concurrency.
> > >
> > > Regards,
> > >
> > > Peter.
> > >
> > >
> > > ----- Original message -----
> > > > The problem is with TaskManager's public api method Task.runAfter.
> > > >       This is well documented in River-344.
> > > >
> > > > The fix requires changing every class that uses it
> > > >
> > > > As a thread pool TaskManager is correct provided that no ordering
> > > > dependencies exist between tasks.
> > > >
> > > > TaskManager doesn't compare to Doug Lee's ExecutorService
> > > > implementations, it should be consigned to history, lets clean our
> > > > code up.
> > > >
> > > > What configuration choice did we originally have anyway?      Any
> > > > Executor you like as long as it's TaskManager?
> > > >
> > > > TaskManager is not part of our public api, it's an implementation
> > > > detail in com.sun.*
> > > >
> > > > Regards,
> > > >
> > > > Peter.
> > > > ----- Original message -----
> > > > >
> > > > > I’d like you to make a reasonable case for why TaskManager needs
> > > > > to be replaced, requiring changes to many other classes that
> > > > > depend on TaskManager, rather than stating what the problem is
> > > > > with TaskManager and seeking to fix it, which would only affect
> > > > > TaskManager and not require modifying and then debugging other
> > > > > code.
> > > > >
> > > > > Greg.
> > > > >
> > > > > On Jan 4, 2014, at 5:53 AM, Peter Firmstone <ji...@zeus.net.au>
> > > > > wrote:
> > > > >
> > > > > > Would you like me to add this class, so that existing
> > > > > > configurations utilising a TaskManager can also be used?     
> > > > > > This might be useful for retaining backward compatibility with
> > > > > > existing configurations?
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Peter.
> > > > > >
> > > > > >
> > > > > > /*
> > > > > > * Licensed to the Apache Software Foundation (ASF) under one
> > > > > > * or more contributor license agreements.        See the NOTICE
> > > > > > file * distributed with this work for additional information
> > > > > > * regarding copyright ownership. The ASF licenses this file
> > > > > > * to you under the Apache License, Version 2.0 (the
> > > > > > * "License"); you may not use this file except in compliance
> > > > > > * with the License. You may obtain a copy of the License at
> > > > > > *
> > > > > > *                     
> > > > > > http://www.apache.org/licenses/LICENSE-2.0 *
> > > > > > * Unless required by applicable law or agreed to in writing,
> > > > > > software * distributed under the License is distributed on an
> > > > > > "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
> > > > > > either express or implied. * See the License for the specific
> > > > > > language governing permissions and * limitations under the
> > > > > > License. */
> > > > > >
> > > > > > package com.sun.jini.thread;
> > > > > >
> > > > > > import com.sun.jini.thread.TaskManager.Task;
> > > > > > import java.util.List;
> > > > > > import java.util.concurrent.AbstractExecutorService;
> > > > > > import java.util.concurrent.ExecutorService;
> > > > > > import java.util.concurrent.RejectedExecutionException;
> > > > > > import java.util.concurrent.TimeUnit;
> > > > > >
> > > > > > /**
> > > > > > *
> > > > > > * @author peter
> > > > > > */
> > > > > > public class TaskManagerWrapper extends AbstractExecutorService
> > > > > > implements ExecutorService {
> > > > > >
> > > > > > private final TaskManager tm;
> > > > > > private final PosionPill pill;
> > > > > > private volatile boolean isShutdown;
> > > > > >
> > > > > > public TaskManagerWrapper(TaskManager manager){
> > > > > > tm = manager;
> > > > > > isShutdown = false;
> > > > > > pill = new PosionPill(manager);
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public void shutdown() {
> > > > > > isShutdown = true;
> > > > > > tm.add(pill);
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public List<Runnable> shutdownNow() {
> > > > > > isShutdown = true;
> > > > > > tm.terminate();
> > > > > > return tm.getPending();
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public boolean isShutdown() {
> > > > > > return isShutdown;
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public boolean isTerminated() {
> > > > > > return isShutdown;
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public boolean awaitTermination(long timeout, TimeUnit unit)
> > > > > > throws InterruptedException { long start =
> > > > > > System.currentTimeMillis(); long duration =
> > > > > > unit.toMillis(timeout); synchronized (pill){
> > > > > > while (!pill.terminated){
> > > > > > wait(duration);
> > > > > > if (pill.terminated) return true;
> > > > > > long elapsed = System.currentTimeMillis() - start;
> > > > > > if (elapsed >= duration) return false;
> > > > > > duration = duration - elapsed;
> > > > > > }
> > > > > > }
> > > > > > return true; // pill was terminated.
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public void execute(Runnable command) {
> > > > > > if (isShutdown) throw new
> > > > > > RejectedExecutionException("TaskManager terminated"); }
> > > > > >
> > > > > > private static class PosionPill implements Task {
> > > > > >
> > > > > > private final TaskManager tm;
> > > > > > boolean terminated;
> > > > > >
> > > > > > PosionPill(TaskManager tm){
> > > > > > this.tm = tm;
> > > > > > terminated = false;
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public boolean runAfter(List tasks, int size) {
> > > > > > if (!tasks.isEmpty()) return true; // Make sure we always run
> > > > > > last. return false;
> > > > > > }
> > > > > >
> > > > > > @Override
> > > > > > public void run() {
> > > > > > tm.terminate(); // Bye.
> > > > > > synchronized (this){
> > > > > > terminated = true;
> > > > > > notifyAll();
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > }
> > > > > >
> > > > > > }
> > > > >
> > > >
> > >
> >
>