You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/09/30 19:03:00 UTC
svn commit: r1177734 - in
/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari:
controller/ controller/rest/resources/ resource/statemachine/
Author: ddas
Date: Fri Sep 30 17:02:59 2011
New Revision: 1177734
URL: http://svn.apache.org/viewvc?rev=1177734&view=rev
Log:
AMBARI-10. Heartbeat iteration. Added retry for server actions.
Modified:
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java Fri Sep 30 17:02:59 2011
@@ -47,13 +47,18 @@ public class Controller {
private static Controller instance = new Controller();
private Server server = null;
public volatile boolean running = true; // true while controller runs
-
+ private HeartbeatHandler hbHandler;
+
public static Controller getInstance() {
return instance;
}
+ public HeartbeatHandler getHeartbeatHandler() {
+ return hbHandler;
+ }
+
public void run() {
-
+ hbHandler = new HeartbeatHandler();
server = new Server(CONTROLLER_PORT);
try {
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Fri Sep 30 17:02:59 2011
@@ -19,13 +19,12 @@ package org.apache.ambari.controller;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
@@ -39,34 +38,23 @@ import org.apache.ambari.common.rest.ent
import org.apache.ambari.common.rest.entities.agent.ControllerResponse;
import org.apache.ambari.common.rest.entities.agent.HeartBeat;
import org.apache.ambari.common.rest.entities.agent.ServerStatus;
-import org.apache.ambari.common.rest.entities.agent.ServerStatus.State;
import org.apache.ambari.components.ClusterContext;
import org.apache.ambari.components.impl.ClusterContextImpl;
import org.apache.ambari.components.impl.HDFSPluginImpl;
import org.apache.ambari.resource.statemachine.Role;
+import org.apache.ambari.resource.statemachine.RoleEvent;
+import org.apache.ambari.resource.statemachine.RoleEventType;
import org.apache.ambari.resource.statemachine.Service;
import org.apache.ambari.resource.statemachine.StateMachineInvoker;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
public class HeartbeatHandler {
private Map<String, ControllerResponse> agentToHeartbeatResponseMap =
- new TreeMap<String, ControllerResponse>();
+ Collections.synchronizedMap(new HashMap<String, ControllerResponse>());
- LinkedBlockingQueue<HeartBeat> heartbeatQueue =
- new LinkedBlockingQueue<HeartBeat>();
+ private RetryCountForRoleServerAction retryCountForRole = new RetryCountForRoleServerAction();
- Map<String, List<Action>> responseMap =
- new HashMap<String, List<Action>>();
-
- public void addActionsForNode(String hostname, List<Action> actions) {
- synchronized (this) {
- List<Action> currentActions = responseMap.get(hostname);
- if (currentActions != null) {
- currentActions.addAll(actions);
- }
- }
- }
+ final short MAX_RETRY_COUNT = 3;
public ControllerResponse processHeartBeat(HeartBeat heartbeat)
throws DatatypeConfigurationException, IOException {
@@ -92,7 +80,7 @@ public class HeartbeatHandler {
Clusters.getInstance().getClusterByName(state.getClusterName());
ClusterContext clusterContext = new ClusterContextImpl(cluster, node);
- List <Action> allActions = new ArrayList<Action>();
+ List<Action> allActions = new ArrayList<Action>();
if (heartbeat.getIdle()) {
//if the command-execution takes longer than one heartbeat interval
@@ -101,15 +89,13 @@ public class HeartbeatHandler {
//to reflect the command execution state more accurately.
//get what is currently running on the node
- List<ServerStatus> serverStatuses = heartbeat.getServersStatus();
+ List<ServerStatus> roleStatuses = heartbeat.getServersStatus();
- //CHECK what servers moved the role to ACTIVE state
+ //what servers are running currently
StartedComponentServers componentServers = new StartedComponentServers();
- for (ServerStatus status : serverStatuses) {
- if (status.getState() == State.STARTED) {
- componentServers.serverStarted(status.getComponent(),
- status.getRole());
- }
+ for (ServerStatus status : roleStatuses) {
+ componentServers.roleServerStarted(status.getComponent(),
+ status.getServerName());
}
//get the state machine reference to the cluster
@@ -119,19 +105,60 @@ public class HeartbeatHandler {
//the state machine reference to the services
List<Service> clusterServices = clusterSMobject.getServices();
//go through all the services, and check which role should be started
- //Get the corresponding commands
for (Service service : clusterServices) {
List<Role> roles = service.getRoles();
+
for (Role role : roles) {
- if (role.shouldStart() &&
- !componentServers.isStarted(
- role.getAssociatedService().getServiceName(),
- role.getRoleName())) {
- //TODO: get reference to the plugin impl
- HDFSPluginImpl plugin = new HDFSPluginImpl();
- List<Action> actions =
- plugin.startRoleServer(clusterContext, role.getRoleName());
- allActions.addAll(actions);
+ boolean roleServerRunning = componentServers.isStarted(
+ role.getAssociatedService().getServiceName(),
+ role.getRoleName());
+ //TODO: get reference to the plugin impl for this service/component
+ HDFSPluginImpl plugin = new HDFSPluginImpl();
+ //check whether the agent should start any server
+ if (role.shouldStart()) {
+ if (!roleServerRunning) {
+ short retryCount = retryCountForRole.get(role);
+ if (retryCount > MAX_RETRY_COUNT) {
+ //LOG the failure to start the role server
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.S_START_FAILURE, role));
+ retryCountForRole.reset(role);
+ continue;
+ }
+ List<Action> actions =
+ plugin.startRoleServer(clusterContext, role.getRoleName());
+ allActions.addAll(actions);
+ retryCountForRole.incr(role);
+ }
+ //raise an event to the state machine for a successful role-start
+ if (roleServerRunning) {
+ retryCountForRole.reset(role);
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.S_START_SUCCESS, role));
+ }
+ }
+ //check whether the agent should stop any server
+ if (role.shouldStop()) {
+ if (roleServerRunning) {
+ short retryCount = retryCountForRole.get(role);
+ if (retryCount > MAX_RETRY_COUNT) {
+ //LOG the failure to start the role server
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.S_STOP_FAILURE, role));
+ retryCountForRole.reset(role);
+ continue;
+ }
+ List<Action> actions =
+ plugin.stopRoleServer(clusterContext, role.getRoleName());
+ allActions.addAll(actions);
+ retryCountForRole.incr(role);
+ }
+ //raise an event to the state machine for a successful role-stop
+ if (!roleServerRunning) {
+ retryCountForRole.reset(role);
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.S_STOP_SUCCESS, role));
+ }
}
}
}
@@ -146,7 +173,7 @@ public class HeartbeatHandler {
private static class StartedComponentServers {
private Map<String, Map<String, Boolean>> startedComponentServerMap =
new HashMap<String, Map<String, Boolean>>();
- void serverStarted(String component, String server) {
+ void roleServerStarted(String component, String server) {
Map<String, Boolean> serverStartedMap = null;
if ((serverStartedMap = startedComponentServerMap.get(component))
!= null) {
@@ -159,10 +186,28 @@ public class HeartbeatHandler {
}
boolean isStarted(String component, String server) {
Map<String, Boolean> startedServerMap;
- if ((startedServerMap=startedComponentServerMap.get(component)) != null){
+ if ((startedServerMap=startedComponentServerMap.get(component))
+ != null) {
return startedServerMap.get(server) != null;
}
return false;
}
}
+
+ private static class RetryCountForRoleServerAction {
+ private Map<Role, Short> countMap = new HashMap<Role, Short>();
+ public short get(Role role) {
+ return countMap.get(role);
+ }
+ public void incr(Role role) {
+ Short currentCount = 0;
+ if ((currentCount = countMap.get(role)) == null) {
+ currentCount = 0;
+ }
+ countMap.put(role, (short) (currentCount + 1));
+ }
+ public void reset(Role role) {
+ countMap.remove(role);
+ }
+ }
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java Fri Sep 30 17:02:59 2011
@@ -18,6 +18,7 @@
package org.apache.ambari.controller.rest.resources;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -32,6 +33,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
+import javax.xml.datatype.DatatypeConfigurationException;
import org.apache.ambari.common.rest.entities.agent.Action;
import org.apache.ambari.common.rest.entities.agent.Action.Kind;
@@ -43,6 +45,7 @@ import org.apache.ambari.common.rest.ent
import org.apache.ambari.common.rest.entities.agent.HardwareProfile;
import org.apache.ambari.common.rest.entities.agent.HeartBeat;
import org.apache.ambari.common.rest.entities.agent.ServerStatus;
+import org.apache.ambari.controller.Controller;
/**
* Controller Resource represents Ambari controller.
@@ -61,58 +64,17 @@ public class ControllerResource {
* @response.representation.200.mediaType application/json
* @response.representation.500.doc Error in accepting heartbeat message
* @param message Heartbeat message
+ * @throws IOException
+ * @throws DatatypeConfigurationException
*/
@Path(value = "/agent/{hostname}")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- public ControllerResponse heartbeat(HeartBeat message) {
- ControllerResponse controllerResponse = new ControllerResponse();
- controllerResponse.setResponseId("id-00002");
- List<Command> commands = new ArrayList<Command>();
- String[] cmd = { "ls", "-l" };
- commands.add(new Command("root", cmd));
- commands.add(new Command("root", cmd));
- commands.add(new Command("root", cmd));
-
- List<Command> cleanUps = new ArrayList<Command>();
- String[] cleanUpCmd = { "ls", "-t" };
- cleanUps.add(new Command("hdfs", cleanUpCmd));
- cleanUps.add(new Command("hdfs", cleanUpCmd));
-
- Action action = new Action();
- action.setUser("hdfs");
- action.setComponent("hdfs");
- action.setRole("datanode");
- action.setKind(Kind.STOP_ACTION);
- action.setSignal(Signal.KILL);
- action.setClusterId("cluster-001");
- action.setId("action-001");
-
- Action action2 = new Action();
- action2.setUser("hdfs");
- action2.setKind(Kind.START_ACTION);
- action2.setId("action-002");
- action2.setClusterId("cluster-002");
- action2.setCommands(commands);
- action2.setCleanUpCommands(cleanUps);
- action2.setComponent("hdfs");
- action2.setRole("datanode");
-
- Action action3 = new Action();
- action3.setUser("hdfs");
- action3.setKind(Kind.RUN_ACTION);
- action3.setId("action-003");
- action3.setClusterId("cluster-003");
- action3.setCommands(commands);
- action3.setCleanUpCommands(cleanUps);
-
- List<Action> actions = new ArrayList<Action>();
- actions.add(action);
- actions.add(action2);
- actions.add(action3);
- controllerResponse.setActions(actions);
- return controllerResponse;
+ public ControllerResponse heartbeat(HeartBeat message)
+ throws DatatypeConfigurationException, IOException {
+ return Controller.getInstance()
+ .getHeartbeatHandler().processHeartBeat(message);
}
/**
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java Fri Sep 30 17:02:59 2011
@@ -47,13 +47,17 @@ public class RoleImpl implements Role, E
RoleEvent>(RoleState.INACTIVE)
.addTransition(RoleState.INACTIVE, RoleState.STARTING, RoleEventType.S_START, new RoleStartTransition())
.addTransition(RoleState.STARTING, RoleState.ACTIVE, RoleEventType.S_START_SUCCESS, new SuccessStartTransition())
+ .addTransition(RoleState.ACTIVE, RoleState.ACTIVE, RoleEventType.S_START_SUCCESS)
.addTransition(RoleState.STARTING, RoleState.FAIL, RoleEventType.S_START_FAILURE)
+ .addTransition(RoleState.FAIL, RoleState.FAIL, RoleEventType.S_START_FAILURE)
.addTransition(RoleState.ACTIVE, RoleState.STOPPING, RoleEventType.S_STOP)
.addTransition(RoleState.STOPPING, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
.addTransition(RoleState.STOPPING, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
.addTransition(RoleState.FAIL, RoleState.STOPPING, RoleEventType.S_STOP)
.addTransition(RoleState.STOPPING, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
+ .addTransition(RoleState.INACTIVE, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
.addTransition(RoleState.STOPPING, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
+ .addTransition(RoleState.UNCLEAN_STOP, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
.installTopology();
private final StateMachine<RoleState, RoleEventType, RoleEvent>
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Fri Sep 30 17:02:59 2011
@@ -35,6 +35,7 @@ public class StateMachineInvoker {
dispatcher = new AsyncDispatcher();
dispatcher.register(ClusterEventType.class, new ClusterEventDispatcher());
dispatcher.register(ServiceEventType.class, new ServiceEventDispatcher());
+ dispatcher.register(RoleEventType.class, new RoleEventDispatcher());
}
public Dispatcher getAMBARIDispatcher() {
@@ -61,6 +62,14 @@ public class StateMachineInvoker {
}
}
+ public static class RoleEventDispatcher
+ implements EventHandler<RoleEvent> {
+ @Override
+ public void handle(RoleEvent event) {
+ ((EventHandler<RoleEvent>)event.getRole()).handle(event);
+ }
+ }
+
private static ConcurrentMap<String, Cluster> clusters =
new ConcurrentHashMap<String, Cluster>();
public static Cluster createCluster(String clusterId) {