You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/09/30 19:03:00 UTC

svn commit: r1177734 - in /incubator/ambari/trunk/controller/src/main/java/org/apache/ambari: controller/ controller/rest/resources/ resource/statemachine/

Author: ddas
Date: Fri Sep 30 17:02:59 2011
New Revision: 1177734

URL: http://svn.apache.org/viewvc?rev=1177734&view=rev
Log:
AMBARI-10. Heartbeat iteration. Added retry for server actions.

Modified:
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Controller.java Fri Sep 30 17:02:59 2011
@@ -47,13 +47,18 @@ public class Controller {
   private static Controller instance = new Controller();
   private Server server = null;
   public volatile boolean running = true; // true while controller runs
-
+  private HeartbeatHandler hbHandler;
+  
   public static Controller getInstance() {
     return instance;
   }
   
+  public HeartbeatHandler getHeartbeatHandler() {
+    return hbHandler;
+  }
+  
   public void run() {
-          
+    hbHandler = new HeartbeatHandler();      
     server = new Server(CONTROLLER_PORT);
 
     try {

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Fri Sep 30 17:02:59 2011
@@ -19,13 +19,12 @@ package org.apache.ambari.controller;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.xml.datatype.DatatypeConfigurationException;
 import javax.xml.datatype.DatatypeFactory;
@@ -39,34 +38,23 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.agent.ControllerResponse;
 import org.apache.ambari.common.rest.entities.agent.HeartBeat;
 import org.apache.ambari.common.rest.entities.agent.ServerStatus;
-import org.apache.ambari.common.rest.entities.agent.ServerStatus.State;
 import org.apache.ambari.components.ClusterContext;
 import org.apache.ambari.components.impl.ClusterContextImpl;
 import org.apache.ambari.components.impl.HDFSPluginImpl;
 import org.apache.ambari.resource.statemachine.Role;
+import org.apache.ambari.resource.statemachine.RoleEvent;
+import org.apache.ambari.resource.statemachine.RoleEventType;
 import org.apache.ambari.resource.statemachine.Service;
 import org.apache.ambari.resource.statemachine.StateMachineInvoker;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
 public class HeartbeatHandler {
   
   private Map<String, ControllerResponse> agentToHeartbeatResponseMap = 
-      new TreeMap<String, ControllerResponse>();
+      Collections.synchronizedMap(new HashMap<String, ControllerResponse>());
   
-  LinkedBlockingQueue<HeartBeat> heartbeatQueue = 
-      new LinkedBlockingQueue<HeartBeat>();
+  private RetryCountForRoleServerAction retryCountForRole = new RetryCountForRoleServerAction();
   
-  Map<String, List<Action>> responseMap = 
-      new HashMap<String, List<Action>>();
-  
-  public void addActionsForNode(String hostname, List<Action> actions) {
-    synchronized (this) {
-      List<Action> currentActions = responseMap.get(hostname);
-      if (currentActions != null) {
-        currentActions.addAll(actions);
-      }
-    }
-  }
+  final short MAX_RETRY_COUNT = 3;
   
   public ControllerResponse processHeartBeat(HeartBeat heartbeat) 
       throws DatatypeConfigurationException, IOException {
@@ -92,7 +80,7 @@ public class HeartbeatHandler {
         Clusters.getInstance().getClusterByName(state.getClusterName());
     ClusterContext clusterContext = new ClusterContextImpl(cluster, node);
     
-    List <Action> allActions = new ArrayList<Action>();
+    List<Action> allActions = new ArrayList<Action>();
     
     if (heartbeat.getIdle()) {
       //if the command-execution takes longer than one heartbeat interval
@@ -101,15 +89,13 @@ public class HeartbeatHandler {
       //to reflect the command execution state more accurately.
       
       //get what is currently running on the node
-      List<ServerStatus> serverStatuses = heartbeat.getServersStatus();
+      List<ServerStatus> roleStatuses = heartbeat.getServersStatus();
       
-      //CHECK what servers moved the role to ACTIVE state
+      //what servers are running currently
       StartedComponentServers componentServers = new StartedComponentServers();
-      for (ServerStatus status : serverStatuses) {
-        if (status.getState() == State.STARTED) {
-          componentServers.serverStarted(status.getComponent(), 
-              status.getRole());
-        }
+      for (ServerStatus status : roleStatuses) {
+        componentServers.roleServerStarted(status.getComponent(), 
+            status.getServerName());
       }
 
       //get the state machine reference to the cluster
@@ -119,19 +105,60 @@ public class HeartbeatHandler {
       //the state machine reference to the services
       List<Service> clusterServices = clusterSMobject.getServices();
       //go through all the services, and check which role should be started
-      //Get the corresponding commands
       for (Service service : clusterServices) {
         List<Role> roles = service.getRoles();
+        
         for (Role role : roles) {
-          if (role.shouldStart() && 
-              !componentServers.isStarted(
-                  role.getAssociatedService().getServiceName(),
-                  role.getRoleName())) {
-            //TODO: get reference to the plugin impl
-            HDFSPluginImpl plugin = new HDFSPluginImpl();
-            List<Action> actions = 
-                plugin.startRoleServer(clusterContext, role.getRoleName());
-            allActions.addAll(actions);
+          boolean roleServerRunning = componentServers.isStarted(
+              role.getAssociatedService().getServiceName(),
+              role.getRoleName());
+          //TODO: get reference to the plugin impl for this service/component
+          HDFSPluginImpl plugin = new HDFSPluginImpl();
+          //check whether the agent should start any server
+          if (role.shouldStart()) {
+            if (!roleServerRunning) {
+              short retryCount = retryCountForRole.get(role);
+              if (retryCount > MAX_RETRY_COUNT) {
+                //LOG the failure to start the role server
+                StateMachineInvoker.getAMBARIEventHandler()
+                .handle(new RoleEvent(RoleEventType.S_START_FAILURE, role));
+                retryCountForRole.reset(role);
+                continue;
+              }
+              List<Action> actions = 
+                  plugin.startRoleServer(clusterContext, role.getRoleName());
+              allActions.addAll(actions);
+              retryCountForRole.incr(role);
+            }
+            //raise an event to the state machine for a successful role-start
+            if (roleServerRunning) {
+              retryCountForRole.reset(role);
+              StateMachineInvoker.getAMBARIEventHandler()
+              .handle(new RoleEvent(RoleEventType.S_START_SUCCESS, role));
+            }
+          }
+          //check whether the agent should stop any server
+          if (role.shouldStop()) {
+            if (roleServerRunning) {
+              short retryCount = retryCountForRole.get(role);
+              if (retryCount > MAX_RETRY_COUNT) {
+                //LOG the failure to start the role server
+                StateMachineInvoker.getAMBARIEventHandler()
+                .handle(new RoleEvent(RoleEventType.S_STOP_FAILURE, role));
+                retryCountForRole.reset(role);
+                continue;
+              }
+              List<Action> actions = 
+                  plugin.stopRoleServer(clusterContext, role.getRoleName());
+              allActions.addAll(actions);
+              retryCountForRole.incr(role);
+            }
+            //raise an event to the state machine for a successful role-stop
+            if (!roleServerRunning) {
+              retryCountForRole.reset(role);
+              StateMachineInvoker.getAMBARIEventHandler()
+              .handle(new RoleEvent(RoleEventType.S_STOP_SUCCESS, role));
+            }
           }
         }
       }
@@ -146,7 +173,7 @@ public class HeartbeatHandler {
   private static class StartedComponentServers {
     private Map<String, Map<String, Boolean>> startedComponentServerMap =
         new HashMap<String, Map<String, Boolean>>();
-    void serverStarted(String component, String server) {
+    void roleServerStarted(String component, String server) {
       Map<String, Boolean> serverStartedMap = null;
       if ((serverStartedMap = startedComponentServerMap.get(component))
           != null) {
@@ -159,10 +186,28 @@ public class HeartbeatHandler {
     }
     boolean isStarted(String component, String server) {
       Map<String, Boolean> startedServerMap;
-      if ((startedServerMap=startedComponentServerMap.get(component)) != null){
+      if ((startedServerMap=startedComponentServerMap.get(component)) 
+          != null) {
         return startedServerMap.get(server) != null;
       }
       return false;
     }
   }
+  
+  private static class RetryCountForRoleServerAction {
+    private Map<Role, Short> countMap = new HashMap<Role, Short>();
+    public short get(Role role) {
+      return countMap.get(role);
+    }
+    public void incr(Role role) {
+      Short currentCount = 0;
+      if ((currentCount = countMap.get(role)) == null) {
+        currentCount = 0;
+      }
+      countMap.put(role, (short) (currentCount + 1));
+    }
+    public void reset(Role role) {
+      countMap.remove(role);
+    }
+  }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/resources/ControllerResource.java Fri Sep 30 17:02:59 2011
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.controller.rest.resources;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -32,6 +33,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
+import javax.xml.datatype.DatatypeConfigurationException;
 
 import org.apache.ambari.common.rest.entities.agent.Action;
 import org.apache.ambari.common.rest.entities.agent.Action.Kind;
@@ -43,6 +45,7 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.agent.HardwareProfile;
 import org.apache.ambari.common.rest.entities.agent.HeartBeat;
 import org.apache.ambari.common.rest.entities.agent.ServerStatus;
+import org.apache.ambari.controller.Controller;
 
 /** 
  * Controller Resource represents Ambari controller.
@@ -61,58 +64,17 @@ public class ControllerResource {
    * @response.representation.200.mediaType application/json
    * @response.representation.500.doc Error in accepting heartbeat message
    * @param message Heartbeat message
+   * @throws IOException 
+   * @throws DatatypeConfigurationException 
    */
   @Path(value = "/agent/{hostname}")
   @POST
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-  public ControllerResponse heartbeat(HeartBeat message) {
-    ControllerResponse controllerResponse = new ControllerResponse();
-    controllerResponse.setResponseId("id-00002");    
-    List<Command> commands = new ArrayList<Command>();
-    String[] cmd = { "ls", "-l" };
-    commands.add(new Command("root", cmd));
-    commands.add(new Command("root", cmd));
-    commands.add(new Command("root", cmd));
-
-    List<Command> cleanUps = new ArrayList<Command>();
-    String[] cleanUpCmd = { "ls", "-t" };
-    cleanUps.add(new Command("hdfs", cleanUpCmd));
-    cleanUps.add(new Command("hdfs", cleanUpCmd));
-    
-    Action action = new Action();
-    action.setUser("hdfs");
-    action.setComponent("hdfs");
-    action.setRole("datanode");
-    action.setKind(Kind.STOP_ACTION);
-    action.setSignal(Signal.KILL);
-    action.setClusterId("cluster-001");
-    action.setId("action-001");
-
-    Action action2 = new Action();
-    action2.setUser("hdfs");
-    action2.setKind(Kind.START_ACTION);
-    action2.setId("action-002");
-    action2.setClusterId("cluster-002");
-    action2.setCommands(commands);
-    action2.setCleanUpCommands(cleanUps);
-    action2.setComponent("hdfs");
-    action2.setRole("datanode");
-
-    Action action3 = new Action();
-    action3.setUser("hdfs");
-    action3.setKind(Kind.RUN_ACTION);
-    action3.setId("action-003");
-    action3.setClusterId("cluster-003");
-    action3.setCommands(commands);
-    action3.setCleanUpCommands(cleanUps);
-
-    List<Action> actions = new ArrayList<Action>();
-    actions.add(action);
-    actions.add(action2);
-    actions.add(action3);
-    controllerResponse.setActions(actions);
-    return controllerResponse;
+  public ControllerResponse heartbeat(HeartBeat message) 
+      throws DatatypeConfigurationException, IOException {
+    return Controller.getInstance()
+           .getHeartbeatHandler().processHeartBeat(message);
   }
 
   /**

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java Fri Sep 30 17:02:59 2011
@@ -47,13 +47,17 @@ public class RoleImpl implements Role, E
          RoleEvent>(RoleState.INACTIVE)
          .addTransition(RoleState.INACTIVE, RoleState.STARTING, RoleEventType.S_START, new RoleStartTransition())
          .addTransition(RoleState.STARTING, RoleState.ACTIVE, RoleEventType.S_START_SUCCESS, new SuccessStartTransition())
+         .addTransition(RoleState.ACTIVE, RoleState.ACTIVE, RoleEventType.S_START_SUCCESS)
          .addTransition(RoleState.STARTING, RoleState.FAIL, RoleEventType.S_START_FAILURE)
+         .addTransition(RoleState.FAIL, RoleState.FAIL, RoleEventType.S_START_FAILURE)
          .addTransition(RoleState.ACTIVE, RoleState.STOPPING, RoleEventType.S_STOP)
          .addTransition(RoleState.STOPPING, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
          .addTransition(RoleState.STOPPING, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
          .addTransition(RoleState.FAIL, RoleState.STOPPING, RoleEventType.S_STOP)
          .addTransition(RoleState.STOPPING, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
+         .addTransition(RoleState.INACTIVE, RoleState.INACTIVE, RoleEventType.S_STOP_SUCCESS)
          .addTransition(RoleState.STOPPING, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
+         .addTransition(RoleState.UNCLEAN_STOP, RoleState.UNCLEAN_STOP, RoleEventType.S_STOP_FAILURE)
          .installTopology();
   
   private final StateMachine<RoleState, RoleEventType, RoleEvent>

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1177734&r1=1177733&r2=1177734&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Fri Sep 30 17:02:59 2011
@@ -35,6 +35,7 @@ public class StateMachineInvoker {
     dispatcher = new AsyncDispatcher();
     dispatcher.register(ClusterEventType.class, new ClusterEventDispatcher());
     dispatcher.register(ServiceEventType.class, new ServiceEventDispatcher());
+    dispatcher.register(RoleEventType.class, new RoleEventDispatcher());
   }
 
   public Dispatcher getAMBARIDispatcher() {
@@ -61,6 +62,14 @@ public class StateMachineInvoker {
     }
   }
   
+  public static class RoleEventDispatcher 
+  implements EventHandler<RoleEvent> {
+    @Override
+    public void handle(RoleEvent event) {
+      ((EventHandler<RoleEvent>)event.getRole()).handle(event);
+    }
+  }
+  
   private static ConcurrentMap<String, Cluster> clusters = 
       new ConcurrentHashMap<String, Cluster>();
   public static Cluster createCluster(String clusterId) {