You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/06/25 19:10:59 UTC
[ambari] branch trunk updated: AMBARI-24177. Ambari gets stuck at
host checks (after successful registration) when installing more number of
nodes via UI. (#1613)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5460e89 AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (#1613)
5460e89 is described below
commit 5460e8952729854f1c032a781c9a8de608ba4475
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Mon Jun 25 22:10:56 2018 +0300
AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (#1613)
* AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (mpapirkovskyy)
* AMBARI-24177. Ambari gets stuck at host checks (after successful registration) when installing more number of nodes via UI. (mpapirkovskyy)
---
.../server/agent/stomp/AgentReportsController.java | 5 +-
.../server/events/DefaultMessageEmitter.java | 2 +-
.../ambari/server/events/MessageEmitter.java | 328 +++++++++++++--------
.../apache/ambari/server/events/STOMPEvent.java | 14 -
.../utils/ScheduledExecutorCompletionService.java | 58 ++++
5 files changed, 262 insertions(+), 145 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 249a141..82ddb1c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -120,10 +120,11 @@ public class AgentReportsController {
}
@MessageMapping("/responses")
- public ReportsResponse handleReceiveReport(AckReport ackReport) throws HostNotRegisteredException {
+ public ReportsResponse handleReceiveReport(@Header String simpSessionId, AckReport ackReport) throws HostNotRegisteredException {
+ Long hostId = agentSessionManager.getHost(simpSessionId).getHostId();
LOG.debug("Handling agent receive report for execution message with messageId {}, status {}, reason {}",
ackReport.getMessageId(), ackReport.getStatus(), ackReport.getReason());
- defaultMessageEmitterProvider.get().processReceiveReport(ackReport);
+ defaultMessageEmitterProvider.get().processReceiveReport(hostId, ackReport);
return new ReportsResponse();
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index f54b743..e9f5c93 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -83,7 +83,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
}
@Override
- public void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException {
+ public void emitMessage(STOMPEvent event) throws AmbariException {
if (StringUtils.isEmpty(getDestination(event))) {
throw new MessageDestinationIsNotDefinedException(event.getType());
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index 47b55ed..2df2ac8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -18,7 +18,7 @@
package org.apache.ambari.server.events;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -26,9 +26,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ambari.server.AmbariException;
@@ -36,6 +35,7 @@ import org.apache.ambari.server.HostNotRegisteredException;
import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.stomp.dto.AckReport;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.utils.ScheduledExecutorCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
@@ -50,27 +50,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Is used to define a strategy for emitting message to subscribers.
*/
public abstract class MessageEmitter {
+ protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
private final static Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
-
+ public final int retryCount;
+ public final int retryInterval;
protected final AgentSessionManager agentSessionManager;
protected final SimpMessagingTemplate simpMessagingTemplate;
- private AmbariEventPublisher ambariEventPublisher;
-
- protected final ScheduledExecutorService emittersExecutor = Executors.newScheduledThreadPool(10,
+ protected final ScheduledExecutorService emitExecutor = Executors.newScheduledThreadPool(10,
new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
- protected final ExecutorService monitorsExecutor = Executors.newFixedThreadPool(10,
- new ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
-
- protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
- protected static final Long ADDITIONAL_TIMEOUT_TIME = 5L;
- protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new ConcurrentHashMap<>();
- protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>> messagesToEmit = new ConcurrentHashMap<>();
-
- // is used to cancel agent queue check on unregistering
- protected ConcurrentHashMap<Long, Future> monitors = new ConcurrentHashMap<>();
-
- public final int retryCount;
- public final int retryInterval;
+ protected final ExecutorService monitorExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("agent-message-monitor-%d").build());
+ protected final ExecutorService retryExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("agent-message-retry-%d").build());
+ protected final ScheduledExecutorCompletionService<EmitTaskWrapper> emitCompletionService =
+ new ScheduledExecutorCompletionService(emitExecutor, new LinkedBlockingQueue<>());
+ protected ConcurrentHashMap<Long, EmitTaskWrapper> unconfirmedMessages = new ConcurrentHashMap<>();
+ protected ConcurrentHashMap<Long, BlockingQueue<EmitTaskWrapper>> messagesToEmit = new ConcurrentHashMap<>();
+ private AmbariEventPublisher ambariEventPublisher;
public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
@@ -80,134 +76,56 @@ public abstract class MessageEmitter {
this.retryCount = retryCount;
this.retryInterval = retryInterval;
ambariEventPublisher.register(this);
+ monitorExecutor.execute(new MessagesToEmitMonitor());
+ retryExecutor.execute(new MessagesToRetryMonitor());
}
/**
* Determines destinations and emits message.
+ *
* @param event message should to be emitted.
* @throws AmbariException
*/
- abstract void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException;
+ abstract void emitMessage(STOMPEvent event) throws AmbariException;
- public void emitMessageRetriable(ExecutionCommandEvent event) throws AmbariException, InterruptedException {
+ public void emitMessageRetriable(ExecutionCommandEvent event) {
// set message identifier used to recognize NACK/ACK agent response
- event.setMessageId(MESSAGE_ID.getAndIncrement());
+ EmitTaskWrapper wrapper = new EmitTaskWrapper(0, MESSAGE_ID.getAndIncrement(), event);
Long hostId = event.getHostId();
- if (!messagesToEmit.containsKey(hostId)) {
- LOG.error("Trying to emit message to unregistered host with id {}", hostId);
- return;
- }
- messagesToEmit.get(hostId).add(event);
- }
-
- private class MessagesToEmitMonitor implements Runnable {
-
- private final Long hostId;
-
- public MessagesToEmitMonitor(Long hostId) {
- this.hostId = hostId;
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- ExecutionCommandEvent event = messagesToEmit.get(hostId).take();
- EmitMessageTask emitMessageTask = new EmitMessageTask(event);
- ScheduledFuture scheduledFuture =
- emittersExecutor.scheduleAtFixedRate(emitMessageTask,
- 0, retryInterval, TimeUnit.SECONDS);
- emitMessageTask.setScheduledFuture(scheduledFuture);
- unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
-
- long timeout = retryCount * retryInterval + ADDITIONAL_TIMEOUT_TIME;
- try {
- scheduledFuture.get(timeout, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- processMessageMissing(scheduledFuture, event);
- }
- } catch (InterruptedException e) {
- // can be interrupted when no responses were received from agent and HEARTBEAT_LOST will be fired
- return;
- } catch (CancellationException e) {
- // scheduled tasks can be canceled
- } catch (ExecutionException e) {
- LOG.error("Error during preparing command to emit", e);
- // generate delivery failed event
- ambariEventPublisher.publish(new MessageNotDelivered(hostId));
- return;
- }
+ messagesToEmit.compute(hostId, (id, hostMessages) -> {
+ if (hostMessages == null) {
+ LOG.error("Trying to emit message to unregistered host with id {}", hostId);
+ return null;
+ } else {
+ hostMessages.add(wrapper);
+ return hostMessages;
}
- }
+ });
}
- public void processReceiveReport(AckReport ackReport) {
+ public void processReceiveReport(Long hostId, AckReport ackReport) {
Long messageId = ackReport.getMessageId();
if (AckReport.AckStatus.OK.equals(ackReport.getStatus())) {
- if (unconfirmedMessages.containsKey(messageId)) {
- unconfirmedMessages.get(messageId).cancel(true);
- unconfirmedMessages.remove(messageId);
- } else {
- LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
- }
+ unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
+ if (commandInUse != null && commandInUse.getMessageId().equals(ackReport.getMessageId())) {
+ return null;
+ } else {
+ LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
+ }
+ return commandInUse;
+ });
} else {
LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}",
ackReport.getStatus(), messageId, ackReport.getReason());
}
}
- private class EmitMessageTask implements Runnable {
-
- private final ExecutionCommandEvent executionCommandEvent;
- private ScheduledFuture scheduledFuture;
- private int retry_counter = 0;
-
- public EmitMessageTask(ExecutionCommandEvent executionCommandEvent) {
- this.executionCommandEvent = executionCommandEvent;
- }
-
- public void setScheduledFuture(ScheduledFuture scheduledFuture) {
- this.scheduledFuture = scheduledFuture;
- }
-
- @Override
- public void run() {
- if (retry_counter >= retryCount) {
- processMessageMissing(scheduledFuture, executionCommandEvent);
- return;
- }
- try {
- retry_counter++;
- emitExecutionCommandToHost(executionCommandEvent);
- } catch (AmbariException e) {
- LOG.error("Error during emitting execution command with message id {} on attempt {}",
- executionCommandEvent.getMessageId(), retry_counter, e);
- }
- }
- }
-
- private void processMessageMissing(ScheduledFuture scheduledFuture, ExecutionCommandEvent executionCommandEvent) {
- // generate delivery failed event and cancel emitter
- ambariEventPublisher.publish(new MessageNotDelivered(executionCommandEvent.getHostId()));
- unconfirmedMessages.remove(executionCommandEvent.getMessageId());
-
- // remove commands queue for host
- messagesToEmit.remove(executionCommandEvent.getHostId());
-
- // cancel retrying to emit command
- scheduledFuture.cancel(true);
-
- // cancel checking for new commands for host
- if (monitors.containsKey(executionCommandEvent.getHostId())) {
- monitors.get(executionCommandEvent.getHostId()).cancel(true);
- }
- }
-
protected abstract String getDestination(STOMPEvent stompEvent);
/**
* Creates STOMP message header.
+ *
* @param sessionId
* @return message header.
*/
@@ -217,6 +135,7 @@ public abstract class MessageEmitter {
/**
* Creates STOMP message header.
+ *
* @param sessionId
* @return message header.
*/
@@ -232,6 +151,7 @@ public abstract class MessageEmitter {
/**
* Emits message to all subscribers.
+ *
* @param event message should to be emitted.
*/
protected void emitMessageToAll(STOMPEvent event) {
@@ -241,6 +161,7 @@ public abstract class MessageEmitter {
/**
* Emit message to specified host only.
+ *
* @param event message should to be emitted.
* @throws HostNotRegisteredException in case host is not registered.
*/
@@ -254,12 +175,14 @@ public abstract class MessageEmitter {
/**
* Emit execution command to specified host only.
- * @param event message should to be emitted.
+ *
+ * @param eventWrapper message should to be emitted.
* @throws HostNotRegisteredException in case host is not registered.
*/
- protected void emitExecutionCommandToHost(ExecutionCommandEvent event) throws HostNotRegisteredException {
+ protected void emitExecutionCommandToHost(EmitTaskWrapper eventWrapper) throws HostNotRegisteredException {
+ ExecutionCommandEvent event = eventWrapper.getExecutionCommandEvent();
Long hostId = event.getHostId();
- Long messageId = event.getMessageId();
+ Long messageId = eventWrapper.getMessageId();
String sessionId = agentSessionManager.getSessionId(hostId);
LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
MessageHeaders headers = createHeaders(sessionId, messageId);
@@ -269,9 +192,158 @@ public abstract class MessageEmitter {
@Subscribe
public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
Long hostId = hostRegisteredEvent.getHostId();
- if (!messagesToEmit.containsKey(hostId)) {
- messagesToEmit.put(hostId, new LinkedBlockingQueue<>());
- monitors.put(hostId, monitorsExecutor.submit(new MessagesToEmitMonitor(hostId)));
+ messagesToEmit.computeIfAbsent(hostId, id -> new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Is used for first emit of arrived messages. There is a single command in process per host at each time.
+ * Host will be released on agent ACK response receiving or {@link MessageNotDelivered} event firing.
+ * In case no new operations were emitted thread will sleep for a small time.
+ */
+ private class MessagesToEmitMonitor implements Runnable {
+
+ /**
+ * Is used to check any message was emitted over available hosts.
+ */
+ private boolean anyActionPerformed;
+
+ @Override
+ public void run() {
+ while (true) {
+ anyActionPerformed = false;
+ for (Long hostId : messagesToEmit.keySet()) {
+ unconfirmedMessages.computeIfAbsent(hostId, id -> {
+ EmitTaskWrapper event = messagesToEmit.get(hostId).poll();
+ if (event != null) {
+ LOG.info("Schedule execution command emitting, retry: {}, messageId: {}",
+ event.getRetryCounter(), event.getMessageId());
+ emitCompletionService.submit(new EmitMessageTask(event, false));
+ anyActionPerformed = true;
+ }
+ return event;
+ });
+ }
+ if (!anyActionPerformed) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ LOG.error("Exception during sleep", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * After first emit completion message will be scheduled to re-emit with delay.
+ * Re-emit task also should check was message already delivered.
+ * After {@link MessageEmitter#retryCount} retries limit exceeded {@link MessageNotDelivered} event will be fired.
+ */
+ private class MessagesToRetryMonitor implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Future<EmitTaskWrapper> future = emitCompletionService.take();
+ EmitTaskWrapper result = future.get();
+ Long hostId = result.getExecutionCommandEvent().getHostId();
+ unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
+ if (commandInUse != null && commandInUse.getMessageId().equals(result.getMessageId())) {
+ if (result.getRetryCounter() < retryCount) {
+ result.retry();
+ LOG.warn("Reschedule execution command emitting, retry: {}, messageId: {}",
+ result.getRetryCounter(), result.getMessageId());
+ emitCompletionService.schedule(new EmitMessageTask(result, true), retryInterval, TimeUnit.SECONDS);
+ } else {
+ ExecutionCommandEvent event = result.getExecutionCommandEvent();
+ // remove commands queue for host
+ messagesToEmit.remove(event.getHostId());
+
+ // generate delivery failed event and cancel emitter
+ ambariEventPublisher.publish(new MessageNotDelivered(event.getHostId()));
+
+ return null;
+ }
+ }
+ return commandInUse;
+ });
+ } catch (InterruptedException e) {
+ LOG.error("Retry message emitting monitor was interrupted", e);
+ } catch (ExecutionException e) {
+ LOG.error("Exception during message emitting retry", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Task to emit command.
+ */
+ private class EmitMessageTask implements Callable<EmitTaskWrapper> {
+
+ /**
+ * Wrapped command to emit.
+ */
+ private final EmitTaskWrapper emitTaskWrapper;
+
+ /**
+ * Should {@link #call()} check for command was successfully emitted and ACK already received.
+ */
+ private final boolean checkRelevance;
+
+ public EmitMessageTask(EmitTaskWrapper emitTaskWrapper, boolean checkRelevance) {
+ this.emitTaskWrapper = emitTaskWrapper;
+ this.checkRelevance = checkRelevance;
+ }
+
+ @Override
+ public EmitTaskWrapper call() throws Exception {
+ try {
+ if (checkRelevance) {
+ Long hostId = emitTaskWrapper.getExecutionCommandEvent().getHostId();
+ EmitTaskWrapper commandInUse = unconfirmedMessages.get(hostId);
+
+ // check ack was already received
+ if (commandInUse != null && commandInUse.getMessageId().equals(emitTaskWrapper.getMessageId())) {
+ emitExecutionCommandToHost(emitTaskWrapper);
+ }
+ } else {
+ emitExecutionCommandToHost(emitTaskWrapper);
+ }
+ } catch (HostNotRegisteredException e) {
+ LOG.error("Trying to emit execution command to unregistered host {} on attempt {}",
+ emitTaskWrapper.getMessageId(), emitTaskWrapper.getRetryCounter(), e);
+ }
+ return emitTaskWrapper;
+ }
+ }
+
+ private class EmitTaskWrapper {
+ private final Long messageId;
+ private final ExecutionCommandEvent executionCommandEvent;
+ private final AtomicInteger retryCounter;
+
+ public EmitTaskWrapper(int retryCounter, Long messageId, ExecutionCommandEvent executionCommandEvent) {
+ this.retryCounter = new AtomicInteger(retryCounter);
+ this.messageId = messageId;
+ this.executionCommandEvent = executionCommandEvent;
+ }
+
+ public int getRetryCounter() {
+ return retryCounter.get();
+ }
+
+ public ExecutionCommandEvent getExecutionCommandEvent() {
+ return executionCommandEvent;
+ }
+
+ public Long getMessageId() {
+ return messageId;
+ }
+
+ public void retry() {
+ retryCounter.incrementAndGet();
}
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index d7e2253..15c3b1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -25,11 +25,6 @@ import java.beans.Transient;
public abstract class STOMPEvent {
/**
- * Message id is unique for original messages, but the same for all re-emitted messages.
- */
- private Long messageId;
-
- /**
* Update type.
*/
protected final Type type;
@@ -48,15 +43,6 @@ public abstract class STOMPEvent {
return type.getMetricName();
}
- @Transient
- public Long getMessageId() {
- return messageId;
- }
-
- public void setMessageId(Long messageId) {
- this.messageId = messageId;
- }
-
public enum Type {
ALERT("events.alerts"),
ALERT_GROUP("events.alert_group"),
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java
new file mode 100644
index 0000000..047cef9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ScheduledExecutorCompletionService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduledExecutorCompletionService<V> extends ExecutorCompletionService<V> {
+ private final ScheduledExecutorService scheduledExecutor;
+ private final BlockingQueue<Future<V>> queue;
+
+ private class QueueingFuture extends FutureTask<Void> {
+ QueueingFuture(RunnableFuture<V> task) {
+ super(task, null);
+ this.task = task;
+ }
+ protected void done() { queue.add(task); }
+ private final Future<V> task;
+ }
+
+ public ScheduledExecutorCompletionService(ScheduledExecutorService scheduledExecutor, BlockingQueue<Future<V>> queue) {
+ super(scheduledExecutor, queue);
+ this.scheduledExecutor = scheduledExecutor;
+ this.queue = queue;
+ }
+
+ public Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
+ if (task == null) throw new NullPointerException();
+ RunnableFuture<V> f = newTaskFor(task);
+ scheduledExecutor.schedule(new QueueingFuture(f), delay, unit);
+ return f;
+ }
+
+ private RunnableFuture<V> newTaskFor(Callable<V> task) {
+ return new FutureTask<V>(task);
+ }
+}