You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/01/13 16:31:12 UTC
[1/2] ambari git commit: AMBARI-19530. Deploy Job failed saying
"Command aborted. Reason: 'Stage timeout'". (mpapirkovskyy)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 b117a56b1 -> 63f175515
refs/heads/trunk fb50d88f1 -> 3283d9f20
AMBARI-19530. Deploy Job failed saying "Command aborted. Reason: 'Stage timeout'". (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/63f17551
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/63f17551
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/63f17551
Branch: refs/heads/branch-2.5
Commit: 63f17551512b70f554484670dfcc6ca0ebc5e3fa
Parents: b117a56
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Fri Jan 13 17:45:32 2017 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Jan 13 17:46:44 2017 +0200
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 18 ++++++++----
.../apache/ambari/server/agent/ActionQueue.java | 31 +++++++++++++++++++-
.../actionmanager/TestActionScheduler.java | 5 +++-
3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e80b020..dabcb98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -40,6 +40,7 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
@@ -78,6 +79,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
@@ -403,8 +405,10 @@ class ActionScheduler implements Runnable {
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
+ Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
- Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule);
+ Map<String, RoleStats> roleStats =
+ processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
// Check if stage is failed
boolean failed = false;
@@ -503,9 +507,10 @@ class ActionScheduler implements Runnable {
if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
serverActionExecutor.awake();
} else {
- actionQueue.enqueue(cmd.getHostname(), cmd);
+ commandsToEnqueue.put(cmd.getHostname(), cmd);
}
}
+ actionQueue.enqueueAll(commandsToEnqueue.asMap());
LOG.debug("==> Finished.");
if (!configuration.getParallelStageExecution()) { // If disabled
@@ -698,7 +703,8 @@ class ActionScheduler implements Runnable {
* @return the stats for the roles in the stage which are used to determine
* whether stage has succeeded or failed
*/
- protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule) throws AmbariException {
+ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
+ Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
@@ -830,7 +836,7 @@ class ActionScheduler implements Runnable {
LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
actionQueue.dequeue(host, c.getCommandId());
} else {
- cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)));
+ cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue);
LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host,
roleStr, s.getActionId());
@@ -1238,7 +1244,7 @@ class ActionScheduler implements Runnable {
}
}
- void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
+ void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
// There are no server actions in actionQueue
if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
@@ -1247,7 +1253,7 @@ class ActionScheduler implements Runnable {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason("Stage timeout");
- actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
+ commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
index 6c7803e..d6d5a35 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
@@ -18,10 +18,12 @@
package org.apache.ambari.server.agent;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -59,6 +61,24 @@ public class ActionQueue {
* @throws NullPointerException - if hostname is {@code}null{@code}
*/
public void enqueue(String hostname, AgentCommand cmd) {
+ Queue<AgentCommand> q = getHostQueue(hostname);
+
+ q.add(cmd);
+ }
+
+ /**
+ * Adds commands to queue (atomically) for given hostname
+ * @param hostname - hostname of node
+ * @param commands - list of commands to add to queue
+ * @throws NullPointerException - if hostname is {@code}null{@code}
+ */
+ public void enqueue(String hostname, Collection<AgentCommand> commands) {
+ Queue<AgentCommand> q = getHostQueue(hostname);
+
+ q.addAll(commands);
+ }
+
+ private Queue<AgentCommand> getHostQueue(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
@@ -70,8 +90,17 @@ public class ActionQueue {
}
//otherwise we got existing queue (and put nothing!)
}
+ return q;
+ }
- q.add(cmd);
+ /**
+ * Adds command map to queue
+ * @param commandMap - map with hostname as key and command list as value
+ */
+ public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) {
+ for (Map.Entry<String, Collection<AgentCommand>> entry : commandMap.entrySet()) {
+ enqueue(entry.getKey(), entry.getValue());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index f86c02e..a613a3b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -106,6 +106,8 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -935,6 +937,7 @@ public class TestActionScheduler {
aq.enqueue(Stage.INTERNAL_HOSTNAME, s.getExecutionCommands(null).get(0).getExecutionCommand());
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
+ Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask;
db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong(),
@@ -953,7 +956,7 @@ public class TestActionScheduler {
EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);
- scheduler.processInProgressStage(s, commandsToSchedule);
+ scheduler.processInProgressStage(s, commandsToSchedule, commandsToEnqueue);
EasyMock.verify(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);
[2/2] ambari git commit: AMBARI-19530. Deploy Job failed saying
"Command aborted. Reason: 'Stage timeout'". (mpapirkovskyy)
Posted by mp...@apache.org.
AMBARI-19530. Deploy Job failed saying "Command aborted. Reason: 'Stage timeout'". (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3283d9f2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3283d9f2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3283d9f2
Branch: refs/heads/trunk
Commit: 3283d9f20c35686d41edf7f9bb0430c8dd77eb24
Parents: fb50d88
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Fri Jan 13 17:45:32 2017 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Jan 13 18:15:03 2017 +0200
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 18 ++++++++----
.../apache/ambari/server/agent/ActionQueue.java | 31 +++++++++++++++++++-
.../actionmanager/TestActionScheduler.java | 5 +++-
3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e80b020..dabcb98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -40,6 +40,7 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
@@ -78,6 +79,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
@@ -403,8 +405,10 @@ class ActionScheduler implements Runnable {
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
+ Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
- Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule);
+ Map<String, RoleStats> roleStats =
+ processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
// Check if stage is failed
boolean failed = false;
@@ -503,9 +507,10 @@ class ActionScheduler implements Runnable {
if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
serverActionExecutor.awake();
} else {
- actionQueue.enqueue(cmd.getHostname(), cmd);
+ commandsToEnqueue.put(cmd.getHostname(), cmd);
}
}
+ actionQueue.enqueueAll(commandsToEnqueue.asMap());
LOG.debug("==> Finished.");
if (!configuration.getParallelStageExecution()) { // If disabled
@@ -698,7 +703,8 @@ class ActionScheduler implements Runnable {
* @return the stats for the roles in the stage which are used to determine
* whether stage has succeeded or failed
*/
- protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule) throws AmbariException {
+ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
+ Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
@@ -830,7 +836,7 @@ class ActionScheduler implements Runnable {
LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
actionQueue.dequeue(host, c.getCommandId());
} else {
- cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)));
+ cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue);
LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host,
roleStr, s.getActionId());
@@ -1238,7 +1244,7 @@ class ActionScheduler implements Runnable {
}
}
- void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
+ void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
// There are no server actions in actionQueue
if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
@@ -1247,7 +1253,7 @@ class ActionScheduler implements Runnable {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason("Stage timeout");
- actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
+ commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
index 6c7803e..d6d5a35 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
@@ -18,10 +18,12 @@
package org.apache.ambari.server.agent;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -59,6 +61,24 @@ public class ActionQueue {
* @throws NullPointerException - if hostname is {@code}null{@code}
*/
public void enqueue(String hostname, AgentCommand cmd) {
+ Queue<AgentCommand> q = getHostQueue(hostname);
+
+ q.add(cmd);
+ }
+
+ /**
+ * Adds commands to queue (atomically) for given hostname
+ * @param hostname - hostname of node
+ * @param commands - list of commands to add to queue
+ * @throws NullPointerException - if hostname is {@code}null{@code}
+ */
+ public void enqueue(String hostname, Collection<AgentCommand> commands) {
+ Queue<AgentCommand> q = getHostQueue(hostname);
+
+ q.addAll(commands);
+ }
+
+ private Queue<AgentCommand> getHostQueue(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
@@ -70,8 +90,17 @@ public class ActionQueue {
}
//otherwise we got existing queue (and put nothing!)
}
+ return q;
+ }
- q.add(cmd);
+ /**
+ * Adds command map to queue
+ * @param commandMap - map with hostname as key and command list as value
+ */
+ public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) {
+ for (Map.Entry<String, Collection<AgentCommand>> entry : commandMap.entrySet()) {
+ enqueue(entry.getKey(), entry.getValue());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index d73a3db..6cc511e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -106,6 +106,8 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -935,6 +937,7 @@ public class TestActionScheduler {
aq.enqueue(Stage.INTERNAL_HOSTNAME, s.getExecutionCommands(null).get(0).getExecutionCommand());
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
+ Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask;
db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong(),
@@ -953,7 +956,7 @@ public class TestActionScheduler {
EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);
- scheduler.processInProgressStage(s, commandsToSchedule);
+ scheduler.processInProgressStage(s, commandsToSchedule, commandsToEnqueue);
EasyMock.verify(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);