You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/06/25 19:10:59 UTC

[ambari] branch trunk updated: AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (#1613)

This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5460e89  AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (#1613)
5460e89 is described below

commit 5460e8952729854f1c032a781c9a8de608ba4475
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Mon Jun 25 22:10:56 2018 +0300

    AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (#1613)
    
    * AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (mpapirkovskyy)
    
    * AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (mpapirkovskyy)
---
 .../server/agent/stomp/AgentReportsController.java |   5 +-
 .../server/events/DefaultMessageEmitter.java       |   2 +-
 .../ambari/server/events/MessageEmitter.java       | 328 +++++++++++++--------
 .../apache/ambari/server/events/STOMPEvent.java    |  14 -
 .../utils/ScheduledExecutorCompletionService.java  |  58 ++++
 5 files changed, 262 insertions(+), 145 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 249a141..82ddb1c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -120,10 +120,11 @@ public class AgentReportsController {
   }
 
   @MessageMapping("/responses")
-  public ReportsResponse handleReceiveReport(AckReport ackReport) throws HostNotRegisteredException {
+  public ReportsResponse handleReceiveReport(@Header String simpSessionId, AckReport ackReport) throws HostNotRegisteredException {
+    Long hostId = agentSessionManager.getHost(simpSessionId).getHostId();
     LOG.debug("Handling agent receive report for execution message with messageId {}, status {}, reason {}",
         ackReport.getMessageId(), ackReport.getStatus(), ackReport.getReason());
-    defaultMessageEmitterProvider.get().processReceiveReport(ackReport);
+    defaultMessageEmitterProvider.get().processReceiveReport(hostId, ackReport);
     return new ReportsResponse();
   }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index f54b743..e9f5c93 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -83,7 +83,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
   }
 
   @Override
-  public void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException {
+  public void emitMessage(STOMPEvent event) throws AmbariException {
     if (StringUtils.isEmpty(getDestination(event))) {
       throw new MessageDestinationIsNotDefinedException(event.getType());
     }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index 47b55ed..2df2ac8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -18,7 +18,7 @@
 package org.apache.ambari.server.events;
 
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -26,9 +26,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ambari.server.AmbariException;
@@ -36,6 +35,7 @@ import org.apache.ambari.server.HostNotRegisteredException;
 import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.stomp.dto.AckReport;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.utils.ScheduledExecutorCompletionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.messaging.MessageHeaders;
@@ -50,27 +50,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * Is used to define a strategy for emitting message to subscribers.
  */
 public abstract class MessageEmitter {
+  protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
   private final static Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
-
+  public final int retryCount;
+  public final int retryInterval;
   protected final AgentSessionManager agentSessionManager;
   protected final SimpMessagingTemplate simpMessagingTemplate;
-  private AmbariEventPublisher ambariEventPublisher;
-
-  protected final ScheduledExecutorService emittersExecutor = Executors.newScheduledThreadPool(10,
+  protected final ScheduledExecutorService emitExecutor = Executors.newScheduledThreadPool(10,
       new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
-  protected final ExecutorService monitorsExecutor = Executors.newFixedThreadPool(10,
-      new ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
-
-  protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
-  protected static final Long ADDITIONAL_TIMEOUT_TIME = 5L;
-  protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new ConcurrentHashMap<>();
-  protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>> messagesToEmit = new ConcurrentHashMap<>();
-
-  // is used to cancel agent queue check on unregistering
-  protected ConcurrentHashMap<Long, Future> monitors = new ConcurrentHashMap<>();
-
-  public final int retryCount;
-  public final int retryInterval;
+  protected final ExecutorService monitorExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("agent-message-monitor-%d").build());
+  protected final ExecutorService retryExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("agent-message-retry-%d").build());
+  protected final ScheduledExecutorCompletionService<EmitTaskWrapper> emitCompletionService =
+      new ScheduledExecutorCompletionService(emitExecutor, new LinkedBlockingQueue<>());
+  protected ConcurrentHashMap<Long, EmitTaskWrapper> unconfirmedMessages = new ConcurrentHashMap<>();
+  protected ConcurrentHashMap<Long, BlockingQueue<EmitTaskWrapper>> messagesToEmit = new ConcurrentHashMap<>();
+  private AmbariEventPublisher ambariEventPublisher;
 
   public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
                         AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
@@ -80,134 +76,56 @@ public abstract class MessageEmitter {
     this.retryCount = retryCount;
     this.retryInterval = retryInterval;
     ambariEventPublisher.register(this);
+    monitorExecutor.execute(new MessagesToEmitMonitor());
+    retryExecutor.execute(new MessagesToRetryMonitor());
   }
 
   /**
    * Determines destinations and emits message.
+   *
    * @param event message should to be emitted.
    * @throws AmbariException
    */
-  abstract void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException;
+  abstract void emitMessage(STOMPEvent event) throws AmbariException;
 
-  public void emitMessageRetriable(ExecutionCommandEvent event) throws AmbariException, InterruptedException {
+  public void emitMessageRetriable(ExecutionCommandEvent event) {
     // set message identifier used to recognize NACK/ACK agent response
-    event.setMessageId(MESSAGE_ID.getAndIncrement());
+    EmitTaskWrapper wrapper = new EmitTaskWrapper(0, MESSAGE_ID.getAndIncrement(), event);
 
     Long hostId = event.getHostId();
-    if (!messagesToEmit.containsKey(hostId)) {
-      LOG.error("Trying to emit message to unregistered host with id {}", hostId);
-      return;
-    }
-    messagesToEmit.get(hostId).add(event);
-  }
-
-  private class MessagesToEmitMonitor implements Runnable {
-
-    private final Long hostId;
-
-    public MessagesToEmitMonitor(Long hostId) {
-      this.hostId = hostId;
-    }
-
-    @Override
-    public void run() {
-      while (true) {
-        try {
-          ExecutionCommandEvent event = messagesToEmit.get(hostId).take();
-          EmitMessageTask emitMessageTask = new EmitMessageTask(event);
-          ScheduledFuture scheduledFuture =
-              emittersExecutor.scheduleAtFixedRate(emitMessageTask,
-                  0, retryInterval, TimeUnit.SECONDS);
-          emitMessageTask.setScheduledFuture(scheduledFuture);
-          unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
-
-          long timeout = retryCount * retryInterval + ADDITIONAL_TIMEOUT_TIME;
-          try {
-            scheduledFuture.get(timeout, TimeUnit.SECONDS);
-          } catch (TimeoutException e) {
-            processMessageMissing(scheduledFuture, event);
-          }
-        } catch (InterruptedException e) {
-          // can be interrupted when no responses were received from agent and HEARTBEAT_LOST will be fired
-          return;
-        } catch (CancellationException e) {
-          // scheduled tasks can be canceled
-        } catch (ExecutionException e) {
-          LOG.error("Error during preparing command to emit", e);
-          // generate delivery failed event
-          ambariEventPublisher.publish(new MessageNotDelivered(hostId));
-          return;
-        }
+    messagesToEmit.compute(hostId, (id, hostMessages) -> {
+      if (hostMessages == null) {
+        LOG.error("Trying to emit message to unregistered host with id {}", hostId);
+        return null;
+      } else {
+        hostMessages.add(wrapper);
+        return hostMessages;
       }
-    }
+    });
   }
 
-  public void processReceiveReport(AckReport ackReport) {
+  public void processReceiveReport(Long hostId, AckReport ackReport) {
     Long messageId = ackReport.getMessageId();
     if (AckReport.AckStatus.OK.equals(ackReport.getStatus())) {
-      if (unconfirmedMessages.containsKey(messageId)) {
-        unconfirmedMessages.get(messageId).cancel(true);
-        unconfirmedMessages.remove(messageId);
-      } else {
-        LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
-      }
+      unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
+        if (commandInUse != null && commandInUse.getMessageId().equals(ackReport.getMessageId())) {
+          return null;
+        } else {
+          LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
+        }
+        return commandInUse;
+      });
     } else {
       LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}",
           ackReport.getStatus(), messageId, ackReport.getReason());
     }
   }
 
-  private class EmitMessageTask implements Runnable {
-
-    private final ExecutionCommandEvent executionCommandEvent;
-    private ScheduledFuture scheduledFuture;
-    private int retry_counter = 0;
-
-    public EmitMessageTask(ExecutionCommandEvent executionCommandEvent) {
-      this.executionCommandEvent = executionCommandEvent;
-    }
-
-    public void setScheduledFuture(ScheduledFuture scheduledFuture) {
-      this.scheduledFuture = scheduledFuture;
-    }
-
-    @Override
-    public void run() {
-      if (retry_counter >= retryCount) {
-        processMessageMissing(scheduledFuture, executionCommandEvent);
-        return;
-      }
-      try {
-        retry_counter++;
-        emitExecutionCommandToHost(executionCommandEvent);
-      } catch (AmbariException e) {
-        LOG.error("Error during emitting execution command with message id {} on attempt {}",
-            executionCommandEvent.getMessageId(), retry_counter, e);
-      }
-    }
-  }
-
-  private void processMessageMissing(ScheduledFuture scheduledFuture, ExecutionCommandEvent executionCommandEvent) {
-    // generate delivery failed event and cancel emitter
-    ambariEventPublisher.publish(new MessageNotDelivered(executionCommandEvent.getHostId()));
-    unconfirmedMessages.remove(executionCommandEvent.getMessageId());
-
-    // remove commands queue for host
-    messagesToEmit.remove(executionCommandEvent.getHostId());
-
-    // cancel retrying to emit command
-    scheduledFuture.cancel(true);
-
-    // cancel checking for new commands for host
-    if (monitors.containsKey(executionCommandEvent.getHostId())) {
-      monitors.get(executionCommandEvent.getHostId()).cancel(true);
-    }
-  }
-
   protected abstract String getDestination(STOMPEvent stompEvent);
 
   /**
    * Creates STOMP message header.
+   *
    * @param sessionId
    * @return message header.
    */
@@ -217,6 +135,7 @@ public abstract class MessageEmitter {
 
   /**
    * Creates STOMP message header.
+   *
    * @param sessionId
    * @return message header.
    */
@@ -232,6 +151,7 @@ public abstract class MessageEmitter {
 
   /**
    * Emits message to all subscribers.
+   *
    * @param event message should to be emitted.
    */
   protected void emitMessageToAll(STOMPEvent event) {
@@ -241,6 +161,7 @@ public abstract class MessageEmitter {
 
   /**
    * Emit message to specified host only.
+   *
    * @param event message should to be emitted.
    * @throws HostNotRegisteredException in case host is not registered.
    */
@@ -254,12 +175,14 @@ public abstract class MessageEmitter {
 
   /**
    * Emit execution command to specified host only.
-   * @param event message should to be emitted.
+   *
+   * @param eventWrapper message should to be emitted.
    * @throws HostNotRegisteredException in case host is not registered.
    */
-  protected void emitExecutionCommandToHost(ExecutionCommandEvent event) throws HostNotRegisteredException {
+  protected void emitExecutionCommandToHost(EmitTaskWrapper eventWrapper) throws HostNotRegisteredException {
+    ExecutionCommandEvent event = eventWrapper.getExecutionCommandEvent();
     Long hostId = event.getHostId();
-    Long messageId = event.getMessageId();
+    Long messageId = eventWrapper.getMessageId();
     String sessionId = agentSessionManager.getSessionId(hostId);
     LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
     MessageHeaders headers = createHeaders(sessionId, messageId);
@@ -269,9 +192,158 @@ public abstract class MessageEmitter {
   @Subscribe
   public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
     Long hostId = hostRegisteredEvent.getHostId();
-    if (!messagesToEmit.containsKey(hostId)) {
-      messagesToEmit.put(hostId, new LinkedBlockingQueue<>());
-      monitors.put(hostId, monitorsExecutor.submit(new MessagesToEmitMonitor(hostId)));
+    messagesToEmit.computeIfAbsent(hostId, id -> new LinkedBlockingQueue<>());
+  }
+
+  /**
+   * Is used for first emit of arrived messages. There is a single command in process per host at each time.
+   * Host will be released on agent ACK response receiving or {@link MessageNotDelivered} event firing.
+   * In case no new operations were emitted thread will sleep for a small time.
+   */
+  private class MessagesToEmitMonitor implements Runnable {
+
+    /**
+     * Is used to check any message was emitted over available hosts.
+     */
+    private boolean anyActionPerformed;
+
+    @Override
+    public void run() {
+      while (true) {
+        anyActionPerformed = false;
+        for (Long hostId : messagesToEmit.keySet()) {
+          unconfirmedMessages.computeIfAbsent(hostId, id -> {
+            EmitTaskWrapper event = messagesToEmit.get(hostId).poll();
+            if (event != null) {
+              LOG.info("Schedule execution command emitting, retry: {}, messageId: {}",
+                  event.getRetryCounter(), event.getMessageId());
+              emitCompletionService.submit(new EmitMessageTask(event, false));
+              anyActionPerformed = true;
+            }
+            return event;
+          });
+        }
+        if (!anyActionPerformed) {
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException e) {
+            LOG.error("Exception during sleep", e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * After first emit completion message will be scheduled to re-emit with delay.
+   * Re-emit task also should check was message already delivered.
+   * After {@link MessageEmitter#retryCount} retries limit exceeded {@link MessageNotDelivered} event will be fired.
+   */
+  private class MessagesToRetryMonitor implements Runnable {
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          Future<EmitTaskWrapper> future = emitCompletionService.take();
+          EmitTaskWrapper result = future.get();
+          Long hostId = result.getExecutionCommandEvent().getHostId();
+          unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
+            if (commandInUse != null && commandInUse.getMessageId().equals(result.getMessageId())) {
+              if (result.getRetryCounter() < retryCount) {
+                result.retry();
+                LOG.warn("Reschedule execution command emitting, retry: {}, messageId: {}",
+                    result.getRetryCounter(), result.getMessageId());
+                emitCompletionService.schedule(new EmitMessageTask(result, true), retryInterval, TimeUnit.SECONDS);
+              } else {
+                ExecutionCommandEvent event = result.getExecutionCommandEvent();
+                // remove commands queue for host
+                messagesToEmit.remove(event.getHostId());
+
+                // generate delivery failed event and cancel emitter
+                ambariEventPublisher.publish(new MessageNotDelivered(event.getHostId()));
+
+                return null;
+              }
+            }
+            return commandInUse;
+          });
+        } catch (InterruptedException e) {
+          LOG.error("Retry message emitting monitor was interrupted", e);
+        } catch (ExecutionException e) {
+          LOG.error("Exception during message emitting retry", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Task to emit command.
+   */
+  private class EmitMessageTask implements Callable<EmitTaskWrapper> {
+
+    /**
+     * Wrapped command to emit.
+     */
+    private final EmitTaskWrapper emitTaskWrapper;
+
+    /**
+     * Should {@link #call()} check for command was successfully emitted and ACK already received.
+     */
+    private final boolean checkRelevance;
+
+    public EmitMessageTask(EmitTaskWrapper emitTaskWrapper, boolean checkRelevance) {
+      this.emitTaskWrapper = emitTaskWrapper;
+      this.checkRelevance = checkRelevance;
+    }
+
+    @Override
+    public EmitTaskWrapper call() throws Exception {
+      try {
+        if (checkRelevance) {
+          Long hostId = emitTaskWrapper.getExecutionCommandEvent().getHostId();
+          EmitTaskWrapper commandInUse = unconfirmedMessages.get(hostId);
+
+          // check ack was already received
+          if (commandInUse != null && commandInUse.getMessageId().equals(emitTaskWrapper.getMessageId())) {
+            emitExecutionCommandToHost(emitTaskWrapper);
+          }
+        } else {
+          emitExecutionCommandToHost(emitTaskWrapper);
+        }
+      } catch (HostNotRegisteredException e) {
+        LOG.error("Trying to emit execution command to unregistered host {} on attempt {}",
+            emitTaskWrapper.getMessageId(), emitTaskWrapper.getRetryCounter(), e);
+      }
+      return emitTaskWrapper;
+    }
+  }
+
+  private class EmitTaskWrapper {
+    private final Long messageId;
+    private final ExecutionCommandEvent executionCommandEvent;
+    private final AtomicInteger retryCounter;
+
+    public EmitTaskWrapper(int retryCounter, Long messageId, ExecutionCommandEvent executionCommandEvent) {
+      this.retryCounter = new AtomicInteger(retryCounter);
+      this.messageId = messageId;
+      this.executionCommandEvent = executionCommandEvent;
+    }
+
+    public int getRetryCounter() {
+      return retryCounter.get();
+    }
+
+    public ExecutionCommandEvent getExecutionCommandEvent() {
+      return executionCommandEvent;
+    }
+
+    public Long getMessageId() {
+      return messageId;
+    }
+
+    public void retry() {
+      retryCounter.incrementAndGet();
     }
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index d7e2253..15c3b1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -25,11 +25,6 @@ import java.beans.Transient;
 public abstract class STOMPEvent {
 
   /**
-   * Message id is unique for original messages, but the same for all re-emitted messages.
-   */
-  private Long messageId;
-
-  /**
    * Update type.
    */
   protected final Type type;
@@ -48,15 +43,6 @@ public abstract class STOMPEvent {
     return type.getMetricName();
   }
 
-  @Transient
-  public Long getMessageId() {
-    return messageId;
-  }
-
-  public void setMessageId(Long messageId) {
-    this.messageId = messageId;
-  }
-
   public enum Type {
     ALERT("events.alerts"),
     ALERT_GROUP("events.alert_group"),
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java
new file mode 100644
index 0000000..047cef9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduledExecutorCompletionService<V> extends ExecutorCompletionService<V> {
+  private final ScheduledExecutorService scheduledExecutor;
+  private final BlockingQueue<Future<V>> queue;
+
+  private class QueueingFuture extends FutureTask<Void> {
+    QueueingFuture(RunnableFuture<V> task) {
+      super(task, null);
+      this.task = task;
+    }
+    protected void done() { queue.add(task); }
+    private final Future<V> task;
+  }
+
+  public ScheduledExecutorCompletionService(ScheduledExecutorService scheduledExecutor, BlockingQueue<Future<V>> queue) {
+    super(scheduledExecutor, queue);
+    this.scheduledExecutor = scheduledExecutor;
+    this.queue = queue;
+  }
+
+  public Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
+    if (task == null) throw new NullPointerException();
+    RunnableFuture<V> f = newTaskFor(task);
+    scheduledExecutor.schedule(new QueueingFuture(f), delay, unit);
+    return f;
+  }
+
+  private RunnableFuture<V> newTaskFor(Callable<V> task) {
+    return new FutureTask<V>(task);
+  }
+}