You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vg...@apache.org on 2011/11/03 08:48:31 UTC

svn commit: r1196984 - in /incubator/ambari/trunk: ./ client/src/main/java/org/apache/ambari/common/rest/entities/ controller/src/main/java/org/apache/ambari/controller/ controller/src/main/java/org/apache/ambari/controller/rest/config/ controller/src/...

Author: vgogate
Date: Thu Nov  3 07:48:30 2011
New Revision: 1196984

URL: http://svn.apache.org/viewvc?rev=1196984&view=rev
Log:
AMBARI-124

Added:
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java
Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Thu Nov  3 07:48:30 2011
@@ -2,6 +2,10 @@ Ambari Change log
 
 Release 0.1.0 - unreleased
 
+  AMBARI-124. Add Zookeeper Data store and persist the cluster definitions across controller restart (vgogate)
+
+  AMBARI-116. Change the name group to provider in hadoop-security-0.xml stack definition (vgogate)
+
   AMBARI-120. Fixed REST resource annotation bugs. (Eric Yang)
 
   AMBARI-121. Added examples for returning REST resources. (Eric Yang)

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java Thu Nov  3 07:48:30 2011
@@ -70,6 +70,17 @@ public class ClusterDefinition {
     protected String name = null;
     
     /**
+     * Every cluster update creates a new revision and returned through this field. 
+     * This field can be optionally be set durint the update to latest revision 
+     * (currently checked out revision) of the cluster being updated and if so,
+     * Ambari will prevent the update, if the latest revision of the cluster changed 
+     * in the background before update. If not specified update will over-write current
+     * latest revision.
+     */
+    @XmlAttribute
+    protected String revision = null;
+  
+    /**
      * A user-facing comment about the cluster about what it is intended for.
      */
     @XmlAttribute
@@ -116,6 +127,13 @@ public class ClusterDefinition {
     
     
     /**
+     * @return the roleToNodesMap
+     */
+    public List<RoleToNodes> getRoleToNodesMap() {
+        return roleToNodesMap;
+    }
+
+    /**
      * @return the stackRevision
      */
     public String getStackRevision() {
@@ -226,4 +244,20 @@ public class ClusterDefinition {
     public void setRoleToNodesMap(List<RoleToNodes> roleToNodesMap) {
             this.roleToNodesMap = roleToNodesMap;
     }
+    
+    
+    /**
+     * @return the revision
+     */
+    public String getRevision() {
+        return revision;
+    }
+
+    /**
+     * @param revision the revision to set
+     */
+    public void setRevision(String revision) {
+        this.revision = revision;
+    }
+
 }

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java Thu Nov  3 07:48:30 2011
@@ -17,18 +17,13 @@
  */
 package org.apache.ambari.common.rest.entities;
 
-import java.util.Date;
-import java.util.GregorianCalendar;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlSchemaType;
-import javax.xml.datatype.DatatypeFactory;
 import javax.xml.datatype.XMLGregorianCalendar;
 
-
 /**
  * The state of a cluster.
  * 
@@ -106,19 +101,6 @@ public class ClusterState {
     public void setCreationTime(XMLGregorianCalendar creationTime) {
             this.creationTime = creationTime;
     }
-    
-    /**
-     * @param creationTime the creationTime to set
-     */
-    public void setCreationTime(Date creationTime) throws Exception {
-        if (creationTime == null) {
-            this.creationTime = null;
-        } else {
-            GregorianCalendar cal = new GregorianCalendar();
-            cal.setTime(creationTime);
-            this.creationTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
-        }
-    }
 
     /**
      * @return the deployTime
@@ -133,19 +115,6 @@ public class ClusterState {
     public void setDeployTime(XMLGregorianCalendar deployTime) {
             this.deployTime = deployTime;
     }
-
-    /**
-     * @param creationTime the creationTime to set
-     */
-    public void setDeployTime(Date deployTime) throws Exception {
-        if (deployTime == null) {
-            this.deployTime = null;
-        } else {
-            GregorianCalendar cal = new GregorianCalendar();
-            cal.setTime(deployTime);
-            this.deployTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
-        }
-    }
     
     /**
      * @return the lastUpdateTime
@@ -160,19 +129,6 @@ public class ClusterState {
     public void setLastUpdateTime(XMLGregorianCalendar lastUpdateTime) {
             this.lastUpdateTime = lastUpdateTime;
     }
-
-    /**
-     * @param creationTime the creationTime to set
-     */
-    public void setLastUpdateTime(Date lastUpdateTime) throws Exception {
-        if (lastUpdateTime == null) {
-            this.lastUpdateTime = null;
-        } else {
-            GregorianCalendar cal = new GregorianCalendar();
-            cal.setTime(lastUpdateTime);
-            this.lastUpdateTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
-        }
-    }
     
     /**
      * @return the state

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java Thu Nov  3 07:48:30 2011
@@ -18,6 +18,7 @@
 package org.apache.ambari.controller;
 
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,109 +29,141 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.Component;
 import org.apache.ambari.components.ComponentPlugin;
 import org.apache.ambari.components.impl.XmlComponentDefinition;
+import org.apache.ambari.datastore.DataStoreFactory;
+import org.apache.ambari.datastore.PersistentDataStore;
 
 
 public class Cluster {
         
     /*
+     * Data Store 
+     */
+    private PersistentDataStore dataStore = DataStoreFactory.getDataStore(DataStoreFactory.ZOOKEEPER_TYPE);
+   
+    /*
      * Latest revision of cluster definition
      */
-    private long latestRevision = 0;
+    private String clusterName = null;
+    private int latestRevisionNumber = -1;
+    private ClusterDefinition latestDefinition = null;
     
-    /**
-     * @return the latestRevision
-     */
-    public long getLatestRevision() {
-        return latestRevision;
-    }
-
     /*
      * Map of cluster revision to cluster definition
      */
-    private final Map<Long, ClusterDefinition> clusterDefinitionRevisionsList = 
-        new ConcurrentHashMap<Long, ClusterDefinition>();
-    private ClusterState clusterState;
-    private ClusterDefinition definition;
+    private final Map<Integer, ClusterDefinition> clusterDefinitionRevisionsList = 
+                    new ConcurrentHashMap<Integer, ClusterDefinition>();
     private final Map<String, ComponentPlugin> plugins =
-        new HashMap<String, ComponentPlugin>();
+                  new HashMap<String, ComponentPlugin>();
     
     
+    public Cluster (String clusterName) {
+        this.clusterName = clusterName;
+    }
+    
+    public Cluster (ClusterDefinition c, ClusterState cs) throws Exception {
+        this.clusterName = c.getName();
+        this.updateClusterDefinition(c);
+        this.updateClusterState(cs);
+    }
+    
+    public synchronized void init () throws Exception {
+        this.latestRevisionNumber = dataStore.retrieveLatestClusterRevisionNumber(clusterName);
+        this.latestDefinition = dataStore.retrieveClusterDefinition(clusterName, this.latestRevisionNumber);
+        loadPlugins(this.latestDefinition);
+        //this.clusterState = dataStore.retrieveClusterState(clusterName);  
+        this.clusterDefinitionRevisionsList.put(this.latestRevisionNumber, this.latestDefinition);
+    }
+    
     /**
      * @return the clusterDefinition
      */
-    public synchronized ClusterDefinition getClusterDefinition(long revision) {
-        return clusterDefinitionRevisionsList.get(revision);
+    public synchronized ClusterDefinition getClusterDefinition(int revision) throws IOException {
+        ClusterDefinition cdef = null;
+        if (revision < 0) {
+            cdef = this.latestDefinition;
+        } else {
+            if (!this.clusterDefinitionRevisionsList.containsKey(revision)) {
+                cdef = dataStore.retrieveClusterDefinition(clusterName, revision);
+                if (!this.clusterDefinitionRevisionsList.containsKey(revision)) {
+                    this.clusterDefinitionRevisionsList.put(revision, cdef);
+                }
+            } else {
+                cdef = this.clusterDefinitionRevisionsList.get(revision);
+            }
+        }
+        return cdef;
     }
     
     /**
-     * @return the latest clusterDefinition
+     * @return the latestRevision
      */
-    public synchronized ClusterDefinition getLatestClusterDefinition() {
-        return definition;
+    public int getLatestRevisionNumber() {
+        return this.latestRevisionNumber;
     }
     
     /**
      * @return Add Cluster definition
      */
-    public synchronized 
-    void addClusterDefinition(ClusterDefinition c) throws Exception {
-      this.latestRevision++;
-      clusterDefinitionRevisionsList.put((long)this.latestRevision, c);
-      definition = c;
-      // find the plugins for the current definition of the cluster
-      Stacks context = Stacks.getInstance();
-      Stack bp = context.getStack(c.getStackName(),
-                                   Integer.parseInt(c.getStackRevision()));
+    public synchronized void updateClusterDefinition(ClusterDefinition c) throws Exception {
+      this.latestRevisionNumber = dataStore.storeClusterDefinition(c);
+      this.clusterDefinitionRevisionsList.put(this.latestRevisionNumber, c);
+      this.latestDefinition = c;
       
-      while (bp != null) {
-        for(Component comp: bp.getComponents()) {
-          String name = comp.getName();
-          if (!plugins.containsKey(name) && comp.getDefinition() != null) {
-            plugins.put(name, new XmlComponentDefinition(comp.getDefinition()));
-          }
-        }
+      // find the plugins for the current definition of the cluster
+      loadPlugins(c);
+    }
+
+    /*
+     * Load plugins for the current definition of the cluster
+     */
+    private void loadPlugins (ClusterDefinition c) throws Exception {
         
-        // go up to the parent
-        if (bp.getParentName() != null) {
-          bp = context.getStack(bp.getParentName(), 
-                                  Integer.parseInt(bp.getParentRevision()));
-        } else {
-          bp = null;
+        Stacks context = Stacks.getInstance();
+        Stack bp = context.getStack(c.getStackName(),
+                                     Integer.parseInt(c.getStackRevision()));
+        
+        while (bp != null) {
+          for(Component comp: bp.getComponents()) {
+            String name = comp.getName();
+            if (!plugins.containsKey(name) && comp.getDefinition() != null) {
+              plugins.put(name, new XmlComponentDefinition(comp.getDefinition()));
+            }
+          }
+          
+          // go up to the parent
+          if (bp.getParentName() != null) {
+            bp = context.getStack(bp.getParentName(), 
+                                    Integer.parseInt(bp.getParentRevision()));
+          } else {
+            bp = null;
+          }
         }
-      }
     }
     
     /**
-     * @return the clusterDefinitionList
-     */
-    public Map<Long, ClusterDefinition> getClusterDefinitionRevisionsList() {
-        return clusterDefinitionRevisionsList;
-    }
-
-    /**
      * @return the clusterState
      */
-    public ClusterState getClusterState() {
-            return clusterState;
+    public ClusterState getClusterState() throws IOException {
+        return dataStore.retrieveClusterState(this.clusterName);
     }
     
     /**
      * @param clusterState the clusterState to set
      */
-    public void setClusterState(ClusterState clusterState) {
-            this.clusterState = clusterState;
+    public void updateClusterState(ClusterState clusterState) throws IOException {
+        dataStore.storeClusterState(this.clusterName, clusterState);
     }
     
-    public synchronized String getName() {
-      return definition.getName();
+    public String getName() {
+        return this.latestDefinition.getName();
     }
 
     public synchronized Iterable<String> getComponents() {
-      return plugins.keySet();
+        return this.plugins.keySet();
     }
     
     public synchronized 
     ComponentPlugin getComponentDefinition(String component) {
-      return plugins.get(component);
+        return this.plugins.get(component);
     }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java Thu Nov  3 07:48:30 2011
@@ -17,11 +17,11 @@
  */
 package org.apache.ambari.controller;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.StringTokenizer;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.ws.rs.WebApplicationException;
@@ -33,19 +33,22 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.ClusterState;
 import org.apache.ambari.common.rest.entities.Node;
 import org.apache.ambari.common.rest.entities.RoleToNodes;
+import org.apache.ambari.datastore.DataStoreFactory;
+import org.apache.ambari.datastore.PersistentDataStore;
 import org.apache.ambari.resource.statemachine.ClusterFSM;
 import org.apache.ambari.resource.statemachine.StateMachineInvoker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class Clusters {
+    // TODO: replace system.out.print by LOG
     private static Log LOG = LogFactory.getLog(Clusters.class);
     
     /*
      * Operational clusters include both active and inactive clusters
      */
     protected ConcurrentHashMap<String, Cluster> operational_clusters = new ConcurrentHashMap<String, Cluster>();
-    
+    protected PersistentDataStore dataStore = DataStoreFactory.getDataStore(DataStoreFactory.ZOOKEEPER_TYPE);
     
     private static Clusters ClustersTypeRef=null;
         
@@ -129,8 +132,12 @@ public class Clusters {
         cluster124.setRoleToNodesMap(rnm);
         
         try {
-            addCluster(cluster123, false);
-            addCluster(cluster124, false);
+            if (!clusterExists(cluster123.getName())) {
+                addCluster(cluster123, false);
+            }
+            if (!clusterExists(cluster124.getName())) {
+                addCluster(cluster124, false);
+            }
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -148,13 +155,56 @@ public class Clusters {
     }
 
     /*
+     * Wrapper method over datastore API
+     */
+    public boolean clusterExists(String clusterName) throws IOException {
+        int x = 0;
+        if (!this.operational_clusters.containsKey(clusterName) &&
+            dataStore.clusterExists(clusterName) == false) {
+            return false;
+        }
+        return true;
+    }
+    
+    /* 
+     * Get the cluster by name
+     * Wrapper over datastore API
+     */
+    public synchronized Cluster getClusterByName(String clusterName) throws Exception {
+        if (clusterExists(clusterName)) {
+            if (!this.operational_clusters.containsKey(clusterName)) {
+                Cluster cls = new Cluster(clusterName);
+                cls.init();
+                this.operational_clusters.put(clusterName, cls);
+            }
+            return this.operational_clusters.get(clusterName);
+        } else {
+            return null;
+        }
+    }
+    
+    /*
+     * Purge the cluster entry from memory and the data store
+     */
+    public synchronized void purgeClusterEntry (String clusterName) throws IOException {
+        dataStore.deleteCluster(clusterName);
+        this.operational_clusters.remove(clusterName);
+    }
+    
+    /*
+     * Add Cluster Entry
+     */
+    public synchronized Cluster addClusterEntry (ClusterDefinition cdef, ClusterState cs) throws Exception {
+        Cluster cls = new Cluster (cdef, cs);
+        this.operational_clusters.put(cdef.getName(), cls);
+        return cls;
+    }
+    
+    /*
      * Rename the cluster
      */
-    public void renameCluster(String clusterName, String new_name) throws Exception {
-        /*
-         * 
-         */
-        if (!this.operational_clusters.containsKey(clusterName)) {
+    public synchronized void renameCluster(String clusterName, String new_name) throws Exception {
+        if (!clusterExists(clusterName)) {
             String msg = "Cluster ["+clusterName+"] does not exist";
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
         }
@@ -164,155 +214,172 @@ public class Clusters {
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.BAD_REQUEST)).get());
         }
         
-        synchronized (operational_clusters) {
-            /*
-             * Check if cluster state is ATTAIC, If yes update the name
-             * don't make new revision of cluster definition as it is in ATTIC state
-             */
-            if (!this.operational_clusters.get(clusterName).getClusterState().getState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
-                String msg = "Cluster state is not ATTIC. Cluster is only allowed to be renamed in ATTIC state";
-                throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_ACCEPTABLE)).get());
-            }
-            
-            Cluster x = this.operational_clusters.get(clusterName);
-            x.getLatestClusterDefinition().setName(new_name);
-            this.operational_clusters.remove(clusterName);
-            this.operational_clusters.put(new_name, x);
+        /*
+         * Check if cluster state is ATTAIC, If yes update the name
+         * don't make new revision of cluster definition as it is in ATTIC state
+         */
+        if (!getClusterByName(clusterName).getClusterState().getState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+            String msg = "Cluster state is not ATTIC. Cluster is only allowed to be renamed in ATTIC state";
+            throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_ACCEPTABLE)).get());
         }
-     
+        
+        Cluster x = this.getClusterByName(clusterName);
+        ClusterDefinition cdef = x.getClusterDefinition(-1);
+        cdef.setName(new_name);
+        ClusterState cs = x.getClusterState();
+        this.addClusterEntry(cdef, cs);
+        this.purgeClusterEntry(clusterName);
     }
     
     /* 
      * Create/Update cluster definition 
      * TODO: As nodes or role to node association changes, validate key services nodes are not removed
     */
-    public ClusterDefinition updateCluster(String clusterName, ClusterDefinition c, boolean dry_run) throws Exception {
-        
+    public synchronized ClusterDefinition updateCluster(String clusterName, ClusterDefinition c, boolean dry_run) throws Exception {       
         /*
-         * if cluster does not exist create it  
+         * Add new cluster if cluster does not exist
          */
-        synchronized (this.operational_clusters) {
-            if (!this.operational_clusters.containsKey(clusterName)) {
-                return addCluster(c, dry_run);
-            }
+        if (!clusterExists(clusterName)) {
+            return addCluster(c, dry_run);
         }
         
-        Cluster cls = this.operational_clusters.get(clusterName);
         /*
          * Time being we will keep entire updated copy as new revision
          */
+        Cluster cls = getClusterByName(clusterName);
         ClusterDefinition newcd = new ClusterDefinition ();
+        newcd.setName(clusterName);
+        if (c.getStackName() != null) {
+            newcd.setStackName(c.getStackName());
+        } else {
+            newcd.setStackName(cls.getClusterDefinition(-1).getStackName());
+        }
+        if (c.getStackRevision() != null) {
+            newcd.setStackRevision(c.getStackRevision());
+        } else {
+            newcd.setStackRevision(cls.getClusterDefinition(-1).getStackRevision());
+        }
+        if (c.getDescription() != null) {
+            newcd.setDescription(c.getDescription());
+        } else {
+            newcd.setDescription(cls.getClusterDefinition(-1).getDescription());
+        }
+        if (c.getGoalState() != null) {
+            newcd.setGoalState(c.getGoalState());
+        } else {
+            newcd.setGoalState(cls.getClusterDefinition(-1).getGoalState());
+        }
+        if (c.getActiveServices() != null) {
+            newcd.setActiveServices(c.getActiveServices());
+        } else {
+            newcd.setActiveServices(cls.getClusterDefinition(-1).getActiveServices());
+        }
         
-        synchronized (cls.getClusterDefinitionRevisionsList()) {
-            newcd.setName(clusterName);
-            if (c.getStackName() != null) {
-                newcd.setStackName(c.getStackName());
-            } else {
-                newcd.setStackName(cls.getLatestClusterDefinition().getStackName());
-            }
-            if (c.getStackRevision() != null) {
-                newcd.setStackRevision(c.getStackRevision());
-            } else {
-                newcd.setStackRevision(cls.getLatestClusterDefinition().getStackRevision());
-            }
-            if (c.getDescription() != null) {
-                newcd.setDescription(c.getDescription());
-            } else {
-                newcd.setDescription(cls.getLatestClusterDefinition().getDescription());
-            }
-            if (c.getGoalState() != null) {
-                newcd.setGoalState(c.getGoalState());
-            } else {
-                newcd.setGoalState(cls.getLatestClusterDefinition().getGoalState());
-            }
-            if (c.getActiveServices() != null) {
-                newcd.setActiveServices(c.getActiveServices());
-            } else {
-                newcd.setActiveServices(cls.getLatestClusterDefinition().getActiveServices());
-            }
-            
-            /*
-             * TODO: What if controller is crashed after updateClusterNodesReservation 
-             * before updating and adding new revision of cluster definition?
-             */
-            boolean updateNodesReservation = false;
-            boolean updateNodeToRolesAssociation = false;
-            if (c.getNodes() != null) {
-                newcd.setNodes(c.getNodes());
-                updateNodesReservation = true;
-                
-            } else {
-                newcd.setNodes(cls.getLatestClusterDefinition().getNodes());
-            }
-            if (c.getRoleToNodes() != null) {
-                newcd.setRoleToNodesMap(c.getRoleToNodes());
-                updateNodeToRolesAssociation = true;
-                
-            }  
-            
-            /*
-             * if Cluster goal state is ATTIC then no need to take any action other than
-             * updating the cluster definition.
-             */
-            if (newcd.getGoalState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
-                cls.getClusterState().setLastUpdateTime(new Date());
-                cls.addClusterDefinition(newcd);
-                /*
-                 * TODO: Persist the latest cluster definition under new revision
-                 */
-                return cls.getLatestClusterDefinition();
-            }
-            
-            /*
-             * Validate the updated cluster definition
-             */
-            validateClusterDefinition(newcd);
-            
-            /*
-             * TODO: If dry_run then return the newcd at this point
-             */
-            if (dry_run) {
-                System.out.println ("Dry run for update cluster..");
-                return newcd;
-            }
-            
-            /*
-             * Update the nodes reservation and node to roles association 
-             */
-            if (updateNodesReservation) {
-                updateClusterNodesReservation (cls.getName(), c);   
-            }
-            if (updateNodeToRolesAssociation) {
-                updateNodeToRolesAssociation(newcd.getNodes(), c.getRoleToNodes());
-            }
-            
-            /*
-             *  Update the last update time & revision
-             */
-            cls.getClusterState().setLastUpdateTime(new Date());
-            cls.addClusterDefinition(newcd);
+        /*
+         * TODO: What if controller is crashed after updateClusterNodesReservation 
+         * before updating and adding new revision of cluster definition?
+         */
+        boolean updateNodesReservation = false;
+        boolean updateNodeToRolesAssociation = false;
+        if (c.getNodes() != null) {
+            newcd.setNodes(c.getNodes());
+            updateNodesReservation = true;
             
-            /*
-             * TODO: Persist the latest cluster definition under new revision
-             */
+        } else {
+            newcd.setNodes(cls.getClusterDefinition(-1).getNodes());
+        }
+        if (c.getRoleToNodes() != null) {
+            newcd.setRoleToNodesMap(c.getRoleToNodes());
+            updateNodeToRolesAssociation = true;
             
-            /*
-             * Invoke state machine event
-             */
-            ClusterFSM clusterFSM = StateMachineInvoker.
-                getStateMachineClusterInstance(cls.getName());
-            if(c.getGoalState().equals(ClusterState.CLUSTER_STATE_ACTIVE)) {
-              clusterFSM.activate();
-            } else if(c.getGoalState().
-                equals(ClusterState.CLUSTER_STATE_INACTIVE)) {
-              clusterFSM.deactivate();
-            } else if(c.getGoalState().
-                equals(ClusterState.CLUSTER_STATE_ATTIC)) {
-              clusterFSM.deactivate();
-              clusterFSM.terminate();
-            }
+        }  
+        
+        /*
+         * if Cluster goal state is ATTIC then no need to take any action other than
+         * updating the cluster definition.
+         */
+        if (newcd.getGoalState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+            ClusterState cs = cls.getClusterState();
+            cs.setLastUpdateTime(Util.getXMLGregorianCalendar(new Date()));
+            cls.updateClusterDefinition(newcd);
+            cls.updateClusterState(cs);
+            return cls.getClusterDefinition(-1);
+        }
+        
+        /*
+         * Validate the updated cluster definition
+         */
+        validateClusterDefinition(newcd);
+        
+        /*
+         * If dry_run then return the newcd at this point
+         */
+        if (dry_run) {
+            System.out.println ("Dry run for update cluster..");
+            return newcd;
+        }
+        
+        /*
+         *  Udate the new cluster definition
+         */
+        ClusterState cs = cls.getClusterState();
+        cs.setLastUpdateTime(Util.getXMLGregorianCalendar(new Date()));
+        cls.updateClusterDefinition(newcd);
+        cls.updateClusterState(cs);
+        
+        /*
+         * Update the nodes reservation and node to roles association 
+         */
+        if (updateNodesReservation) {
+            updateClusterNodesReservation (cls.getName(), c);   
+        }
+        if (updateNodeToRolesAssociation) {
+            updateNodeToRolesAssociation(newcd.getNodes(), c.getRoleToNodes());
+        }
+        
+        /*
+         * Invoke state machine event
+         */
+        ClusterFSM clusterFSM = StateMachineInvoker.
+            getStateMachineClusterInstance(cls.getName());
+        if(c.getGoalState().equals(ClusterState.CLUSTER_STATE_ACTIVE)) {
+          clusterFSM.activate();
+        } else if(c.getGoalState().
+            equals(ClusterState.CLUSTER_STATE_INACTIVE)) {
+          clusterFSM.deactivate();
+        } else if(c.getGoalState().
+            equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+          clusterFSM.deactivate();
+          clusterFSM.terminate();
         }
-        return cls.getLatestClusterDefinition();
+     
+        return cls.getClusterDefinition(-1);
+    }
+    
+    /*
+     * Add default values for new cluster definition 
+     */
+    private void setNewClusterDefaults(ClusterDefinition cdef) throws Exception {
+        /* 
+         * Populate the input cluster definition w/ default values
+         */
+        if (cdef.getDescription() == null) { cdef.setDescription("Ambari cluster : "+cdef.getName());
+        }
+        if (cdef.getGoalState() == null) { cdef.setGoalState(ClusterDefinition.GOAL_STATE_INACTIVE);
+        }
+        
+        /*
+         * If its new cluster, do not specify the revision, set it to null. A revision number is obtained
+         * after persisting the definition
+         */
+        cdef.setRevision(null);
+        
+        // TODO: Add the list of active services by querying pluging component.
+        if (cdef.getActiveServices() == null) {
+            List<String> services = new ArrayList<String>();
+            services.add("ALL");
+            cdef.setActiveServices(services);
+        }    
     }
     
     /* 
@@ -329,36 +396,32 @@ public class Clusters {
      *       are in UNREGISTERED state).  
      */   
     private ClusterDefinition addCluster(ClusterDefinition cdef, boolean dry_run) throws Exception {
-
+        
         /*
          * TODO: Validate the cluster definition and set the default
+         * 
          */
         validateClusterDefinition(cdef);
         
-        /* 
-         * Check if cluster already exist
+        /*
+         * Add the defaults for optional values, if not set
          */
-        if (operational_clusters.containsKey(cdef.getName())) {
-            String msg = "Cluster ["+cdef.getName()+"] already exists";
-                throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.CONFLICT)).get());
-            }
- 
-            /*
+        setNewClusterDefaults(cdef);
+        
+        /*
          * Create new cluster object
          */
         Date requestTime = new Date();
-        Cluster cls = new Cluster();
+        
         ClusterState clsState = new ClusterState();
-        clsState.setCreationTime(requestTime);
-        clsState.setLastUpdateTime(requestTime);
-        clsState.setDeployTime((Date)null);          
+        clsState.setCreationTime(Util.getXMLGregorianCalendar(requestTime));
+        clsState.setLastUpdateTime(Util.getXMLGregorianCalendar(requestTime));
+        clsState.setDeployTime(Util.getXMLGregorianCalendar((Date)null));          
         if (cdef.getGoalState().equals(ClusterDefinition.GOAL_STATE_ATTIC)) {
             clsState.setState(ClusterState.CLUSTER_STATE_ATTIC);
         } else {
             clsState.setState(ClusterDefinition.GOAL_STATE_INACTIVE);
         }
-        cls.addClusterDefinition(cdef);
-        cls.setClusterState(clsState);
         
         /*
          * If dry run then update roles to nodes map, if not specified explicitly
@@ -371,6 +434,13 @@ public class Clusters {
         }
         
         /*
+         * Persist the new cluster and add entry to cache
+         * TODO: Persist reserved nodes against the cluster & service/role? 
+         * 
+         */
+        Cluster cls = this.addClusterEntry(cdef, clsState);
+        
+        /*
          * Update cluster nodes reservation. 
          */
         if (cdef.getNodes() != null 
@@ -396,20 +466,12 @@ public class Clusters {
         }
         
         /*
-         * TODO: Persist the cluster definition to data store as a initial version r0. 
-         *          Persist reserved nodes against the cluster & service/role
-         */
-            
-        // Add the cluster to the list, after definition is persisted
-        this.operational_clusters.put(cdef.getName(), cls);
-        
-        /*
          * Activate the cluster if the goal state is ACTIVE
          * TODO: What to do if activate fails ??? 
         */
         if(cdef.getGoalState().equals(ClusterDefinition.GOAL_STATE_ACTIVE)) {          
             org.apache.ambari.resource.statemachine.ClusterFSM cs = 
-                StateMachineInvoker.createCluster(cls,cls.getLatestRevision(),
+                StateMachineInvoker.createCluster(cls,cls.getLatestRevisionNumber(),
                     cls.getClusterState());
             cs.activate();
         }
@@ -426,7 +488,7 @@ public class Clusters {
     }
     
     /*
-     * Validates the cluster definition
+     * Validate the cluster definition
      * TODO: Validate each role has enough nodes associated with it. 
      */
     private void validateClusterDefinition (ClusterDefinition cdef) throws Exception {
@@ -469,20 +531,6 @@ public class Clusters {
             }
         }
         
-        /* 
-         * Populate the input cluster definition w/ default values
-         */
-        if (cdef.getDescription() == null) { cdef.setDescription("Ambari cluster : "+cdef.getName());
-        }
-        if (cdef.getGoalState() == null) { cdef.setGoalState(cdef.GOAL_STATE_INACTIVE);
-        }
-        
-        // TODO: Add the list of active services by querying pluging component.
-        if (cdef.getActiveServices() == null) {
-            List<String> services = new ArrayList<String>();
-            services.add("ALL");
-            cdef.setActiveServices(services);
-        }
         
         /*
          * Check if all the nodes explicitly specified in the RoleToNodesMap belong the cluster node range specified 
@@ -517,7 +565,7 @@ public class Clusters {
        
         /*
          * Reserve the nodes as specified in the node range expressions
-         * -- throw exception if any nodes are pre-associated with other cluster
+         * -- throw exception, if any nodes are pre-associated with other cluster
          */    
         List<String> nodes_currently_allocated_to_cluster = new ArrayList<String>();
         for (Node n : Nodes.getInstance().getNodes().values()) {
@@ -593,7 +641,7 @@ public class Clusters {
     }
 
     /*
-     * This function disassociate the node from the cluster. The clsuterID associated w/
+     * This function disassociate all the nodes from the cluster. The clsuterID associated w/
      * cluster will be reset by heart beat when node reports all clean.
      */
     public synchronized void releaseClusterNodes (String clusterName) throws Exception {
@@ -644,14 +692,14 @@ public class Clusters {
      * Get Cluster stack
      */
     public Stack getClusterStack(String clusterName, boolean expanded) throws Exception {
-        if (!this.operational_clusters.containsKey(clusterName)) {
+        if (!this.clusterExists(clusterName)) {
             String msg = "Cluster ["+clusterName+"] does not exist";
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
         }
         
-        Cluster cls = this.operational_clusters.get(clusterName);
-        String stackName = cls.getLatestClusterDefinition().getStackName();
-        int stackRevision = Integer.parseInt(cls.getLatestClusterDefinition().getStackRevision());
+        Cluster cls = this.getClusterByName(clusterName);
+        String stackName = cls.getClusterDefinition(-1).getStackName();
+        int stackRevision = Integer.parseInt(cls.getClusterDefinition(-1).getStackRevision());
         
         Stack bp;
         if (!expanded) {
@@ -663,70 +711,61 @@ public class Clusters {
         return bp;
     }
     
-
     
     /*
      * Delete Cluster 
-     * Delete operation will bring the cluster to ATTIC state and then remove the
-     * cluster definition from the controller 
-     * When cluster state transitions to ATTIC, it should check if the cluster definition is 
-     * part of tobe_deleted_clusters map and then delete the definition.
-     * TODO: Delete definition from both operational_clusters and operational_clusters_id_name map and to_be_deleted 
-     * clusters list.
-     */
-    public void deleteCluster(String clusterName) throws Exception { 
-        synchronized (this.operational_clusters) {
-            for (Cluster cls : this.operational_clusters.values()) {
-                if (cls.getLatestClusterDefinition().getName().equals(clusterName)) {
-                    synchronized (cls) {
-                        ClusterDefinition cdf = new ClusterDefinition();
-                        cdf.setName(clusterName);
-                        cdf.setGoalState(ClusterState.CLUSTER_STATE_ATTIC);
-                        updateCluster(clusterName, cdf, false);
-                        /* Update cluster state, mark it "to be deleted" when gets to ATTIC state
-                         * TODO: PERSIST the new flag in the cluster state
-                         */
-                        cls.getClusterState().setMarkForDeletionWhenInAttic(true);          
-                        
-                    }
-                } 
-            }
-        }
-    }   
-     
-    /* 
-     * Get the cluster by name
+     * Delete operation will mark the cluster to_be_deleted and then set the goal state to ATTIC
+     * Once cluster gets to ATTIC state, background daemon should purge the cluster entry.
      */
-    public Cluster getClusterByName(String clusterName) {
-        return this.operational_clusters.get(clusterName);
-    }
-    
+    public synchronized void deleteCluster(String clusterName) throws Exception { 
+
+        if (!this.clusterExists(clusterName)) {
+            System.out.println("Cluster ["+clusterName+"] does not exist!");
+            return;
+        }
+        
+        /*
+         * Update the cluster definition with goal state to be ATTIC
+         */
+        Cluster cls = this.getClusterByName(clusterName);   
+        ClusterDefinition cdf = new ClusterDefinition();
+        cdf.setName(clusterName);
+        cdf.setGoalState(ClusterState.CLUSTER_STATE_ATTIC);
+        cls.updateClusterDefinition(cdf);
+        
+        /* 
+         * Update cluster state, mark it "to be deleted"
+         */
+        ClusterState cs = cls.getClusterState();
+        cs.setMarkForDeletionWhenInAttic(true); 
+        cls.updateClusterState(cs);
+    }      
     
     /*
      * Get the latest cluster definition
      */
-    public ClusterDefinition getLatestClusterDefinition(String clusterName) {
-        return this.operational_clusters.get(clusterName).getLatestClusterDefinition();
+    public ClusterDefinition getLatestClusterDefinition(String clusterName) throws Exception {
+        return this.getClusterByName(clusterName).getClusterDefinition(-1);
     }
     
     /*
      * Get Cluster Definition given name and revision
      */
-    public ClusterDefinition getClusterDefinition(String clusterName, long revision) {
-        return this.operational_clusters.get(clusterName).getClusterDefinition(revision);
+    public ClusterDefinition getClusterDefinition(String clusterName, int revision) throws Exception {
+        return this.getClusterByName(clusterName).getClusterDefinition(revision);
     }
     
     /* 
      * Get the cluster Information by name
      */
     public ClusterInformation getClusterInformation (String clusterName) throws Exception  {
-        if (!this.operational_clusters.containsKey(clusterName)) {
+        if (!this.clusterExists(clusterName)) {
             String msg = "Cluster ["+clusterName+"] does not exist";
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
         }
         ClusterInformation clsInfo = new ClusterInformation();
         clsInfo.setDefinition(this.getLatestClusterDefinition(clusterName));
-        clsInfo.setState(this.operational_clusters.get(clusterName).getClusterState());
+        clsInfo.setState(this.getClusterByName(clusterName).getClusterState());
         return clsInfo;
     }
     
@@ -734,30 +773,32 @@ public class Clusters {
     /* 
      * Get the cluster state
     */
-    public ClusterState getClusterState(String clusterName) throws WebApplicationException {
-        if (!this.operational_clusters.containsKey(clusterName)) {
+    public ClusterState getClusterState(String clusterName) throws Exception {
+        if (!this.clusterExists(clusterName)) {
             String msg = "Cluster ["+clusterName+"] does not exist";
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
         }
-        return this.operational_clusters.get(clusterName).getClusterState();
+        return this.getClusterByName(clusterName).getClusterState();
     }
     
     
     /*
      * Get Cluster Information list i.e. cluster definition and cluster state
      */
-    public List<ClusterInformation> getClusterInformationList(String state) {
+    public List<ClusterInformation> getClusterInformationList(String state) throws Exception {
       List<ClusterInformation> list = new ArrayList<ClusterInformation>();
-      for (Cluster cls : this.operational_clusters.values()) {
+      List<String> clusterNames = dataStore.retrieveClusterList();
+      for (String clsName : clusterNames) {
+        Cluster cls = this.getClusterByName(clsName);
         if (state.equals("ALL")) {
           ClusterInformation clsInfo = new ClusterInformation();
-          clsInfo.setDefinition(cls.getLatestClusterDefinition());
+          clsInfo.setDefinition(cls.getClusterDefinition(-1));
           clsInfo.setState(cls.getClusterState());
           list.add(clsInfo);
         } else {
           if (cls.getClusterState().getState().equals(state)) {
               ClusterInformation clsInfo = new ClusterInformation();
-              clsInfo.setDefinition(cls.getLatestClusterDefinition());
+              clsInfo.setDefinition(cls.getClusterDefinition(-1));
               clsInfo.setState(cls.getClusterState());
               list.add(clsInfo);
           }
@@ -770,14 +811,16 @@ public class Clusters {
      * Get the list of clusters
      * TODO: Get the synchronized snapshot of each cluster definition? 
      */
-    public List<Cluster> getClustersList(String state) {
+    public List<Cluster> getClustersList(String state) throws Exception {
         List<Cluster> list = new ArrayList<Cluster>();
-        if (state.equals("ALL")) {
-          list.addAll(this.operational_clusters.values());
-        } else {
-          for (Cluster cls : this.operational_clusters.values()) {
+        List<String> clusterNames = dataStore.retrieveClusterList();
+        for (String clsName : clusterNames) {
+          Cluster cls = this.getClusterByName(clsName);
+          if (state.equals("ALL")) {
+            list.add(cls);
+          } else {
             if (cls.getClusterState().getState().equals(state)) {
-              list.add(cls);
+                list.add(cls);
             }
           }
         }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Thu Nov  3 07:48:30 2011
@@ -89,7 +89,7 @@ public class HeartbeatHandler {
           .getNodeState().getClusterName();
       if (clusterName != null) {
         clusterRev = Clusters.getInstance().
-            getClusterByName(clusterName).getLatestRevision(); 
+            getClusterByName(clusterName).getLatestRevisionNumber(); 
       }
       
       ComponentAndRoleStates componentStates = 
@@ -576,19 +576,23 @@ public class HeartbeatHandler {
   private void inspectAgentState(HeartBeat heartbeat, 
       ComponentAndRoleStates componentServers)
           throws IOException {
-    List<AgentRoleState> agentRoleStates = 
-        heartbeat.getInstalledRoleStates();
-    if (agentRoleStates == null) {
-      return;
-    }
-    List<Cluster> clustersNodeBelongsTo = new ArrayList<Cluster>();
-    for (AgentRoleState agentRoleState : agentRoleStates) {
-      componentServers.recordRoleState(heartbeat.getHostname(),agentRoleState);
-      Cluster c = Clusters.getInstance().
-          getClusterByName(agentRoleState.getClusterId());
-      clustersNodeBelongsTo.add(c);
-    }
-    checkActionResults(heartbeat, componentServers);
+      try {
+        List<AgentRoleState> agentRoleStates = 
+            heartbeat.getInstalledRoleStates();
+        if (agentRoleStates == null) {
+          return;
+        }
+        List<Cluster> clustersNodeBelongsTo = new ArrayList<Cluster>();
+        for (AgentRoleState agentRoleState : agentRoleStates) {
+          componentServers.recordRoleState(heartbeat.getHostname(),agentRoleState);
+          Cluster c = Clusters.getInstance().
+              getClusterByName(agentRoleState.getClusterId());
+          clustersNodeBelongsTo.add(c);
+        }
+        checkActionResults(heartbeat, componentServers);
+      } catch (Exception e) {
+          throw new IOException (e);
+      }
   }
   
   private void checkActionResults(HeartBeat heartbeat,

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java Thu Nov  3 07:48:30 2011
@@ -85,7 +85,7 @@ public class Nodes {
     public List<Node> getClusterNodes (String clusterName, String roleName, String alive) throws Exception {
         
         List<Node> list = new ArrayList<Node>();
-        ClusterDefinition c = Clusters.getInstance().operational_clusters.get(clusterName).getLatestClusterDefinition();
+        ClusterDefinition c = Clusters.getInstance().operational_clusters.get(clusterName).getClusterDefinition(-1);
         if (c.getNodes() == null || c.getNodes().equals("") || Clusters.getInstance().getClusterByName(clusterName).getClusterState().getState().equalsIgnoreCase("ATTIC")) {
             String msg = "No nodes are reserved for the cluster. Typically cluster in ATTIC state does not have any nodes reserved";
             throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NO_CONTENT)).get());

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java Thu Nov  3 07:48:30 2011
@@ -293,8 +293,8 @@ public class Stacks {
     public Hashtable<String, String> getClusterReferencedStacksList() throws Exception {
         Hashtable<String, String> clusterStacks = new Hashtable<String, String>();
         for (Cluster c : Clusters.getInstance().operational_clusters.values()) {
-            String cBPName = c.getLatestClusterDefinition().getStackName();
-            String cBPRevision = c.getLatestClusterDefinition().getStackRevision();
+            String cBPName = c.getClusterDefinition(-1).getStackName();
+            String cBPRevision = c.getClusterDefinition(-1).getStackRevision();
             Stack bpx = this.getStack(cBPName, Integer.parseInt(cBPRevision));
             clusterStacks.put(cBPName+"-"+cBPRevision, "");
             while (bpx.getParentName() != null) {

Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java Thu Nov  3 07:48:30 2011
@@ -0,0 +1,20 @@
+package org.apache.ambari.controller;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+
+public class Util {
+    
+    public static XMLGregorianCalendar getXMLGregorianCalendar (Date date) throws Exception {
+        if (date == null) {
+            return null;
+        }
+        GregorianCalendar cal = new GregorianCalendar();
+        cal.setTime(date);
+        return DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);   
+    }
+
+}

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java Thu Nov  3 07:48:30 2011
@@ -29,6 +29,7 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.NodeState;
 import org.apache.ambari.common.rest.entities.RoleToNodes;
 import org.apache.ambari.common.rest.entities.Stack;
+import org.apache.ambari.controller.Util;
 import org.apache.ambari.common.rest.entities.StackInformation;
 
 public class Examples {
@@ -77,8 +78,8 @@ public class Examples {
         
         CLUSTER_STATE.setState("ATTIC");
         try {
-			CLUSTER_STATE.setCreationTime(new Date());
-			CLUSTER_STATE.setDeployTime(new Date());
+			CLUSTER_STATE.setCreationTime(Util.getXMLGregorianCalendar(new Date()));
+			CLUSTER_STATE.setDeployTime(Util.getXMLGregorianCalendar(new Date()));
 		} catch (Exception e) {
 		}
         NODE.setName("localhost");

Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java Thu Nov  3 07:48:30 2011
@@ -0,0 +1,24 @@
+package org.apache.ambari.datastore;
+
+import java.io.IOException;
+
+import org.apache.ambari.datastore.impl.ZookeeperDS;
+
+public class DataStoreFactory {
+    
+    public static String  ZOOKEEPER_TYPE = "zookeeper";
+    
+    public static PersistentDataStore getDataStore(String storeType) {
+        if (storeType.equalsIgnoreCase(ZOOKEEPER_TYPE)) {
+            return ZookeeperDS.getInstance();
+        }
+        return null;
+    }
+    
+    public static void main (String args) {
+        try {
+            PersistentDataStore ds = DataStoreFactory.getDataStore(ZOOKEEPER_TYPE);
+        } catch (Exception e) {
+        }
+    }
+}

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java Thu Nov  3 07:48:30 2011
@@ -3,9 +3,9 @@ package org.apache.ambari.datastore;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.ambari.common.rest.entities.ClusterState;
 import org.apache.ambari.common.rest.entities.Stack;
 import org.apache.ambari.common.rest.entities.ClusterDefinition;
-import org.apache.ambari.common.rest.entities.ClusterState;
 
 public interface PersistentDataStore {
     
@@ -16,10 +16,30 @@ public interface PersistentDataStore {
     public void close () throws IOException;
     
     /**
-     * Persist the cluster definition.
-     * 
-     * Create new cluster entry, if one does not exist already else add new revision to existing cluster
-     * Return the revision number for each newly added cluster definition
+     * Check if cluster exists
+     */
+    public boolean clusterExists(String clusterName) throws IOException;
+    
+    /**
+     * Get Latest cluster Revision Number
+     */
+    public int retrieveLatestClusterRevisionNumber(String clusterName) throws IOException;
+    
+    /**
+     * Store the cluster state
+     */
+    public void storeClusterState (String clusterName, ClusterState clsState) throws IOException;
+    
+    /**
+     * Store the cluster state
+     */
+    public ClusterState retrieveClusterState (String clusterName) throws IOException;
+
+    /**
+     * Store the cluster definition.
+     *
+     * Return the revision number for new or updated cluster definition
+     * If cluster revision is not null then, check if existing revision being updated in the store is same.
      */
     public int storeClusterDefinition (ClusterDefinition clusterDef) throws IOException;
     
@@ -39,10 +59,9 @@ public interface PersistentDataStore {
     }
   
     /**
-     * Retrieve all cluster definitions with their latest revisions
-     * 
+     * Retrieve list of existing cluster names
      */
-    public List<NameRevisionPair> retrieveClusterList () throws IOException;
+    public List<String> retrieveClusterList () throws IOException;
     
     
     /**

Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java Thu Nov  3 07:48:30 2011
@@ -0,0 +1,359 @@
+package org.apache.ambari.datastore.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.common.rest.entities.ClusterDefinition;
+import org.apache.ambari.common.rest.entities.ClusterState;
+import org.apache.ambari.common.rest.entities.Stack;
+import org.apache.ambari.common.util.JAXBUtil;
+import org.apache.ambari.controller.Stacks;
+import org.apache.ambari.datastore.PersistentDataStore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZookeeperDS implements PersistentDataStore, Watcher {
+
+    private static final String DEFAULT_ZOOKEEPER_ADDRESS="localhost:2181";
+    private static final String ZOOKEEPER_ROOT_PATH="/ambari";
+    private static final String ZOOKEEPER_CLUSTERS_ROOT_PATH=ZOOKEEPER_ROOT_PATH+"/clusters";
+    private static final String ZOOKEEPER_STACKS_ROOT_PATH=ZOOKEEPER_ROOT_PATH+"/stacks";
+    
+    private ZooKeeper zk;
+    private String credential = null;
+    private boolean zkCoonected = false;
+    
+    private static ZookeeperDS ZookeeperDSRef=null;
+    private ZookeeperDS() {
+        /*
+         * TODO: Read ZooKeeper address and credential from config file
+         */
+        String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS;
+        try {
+            /*
+             * Connect to ZooKeeper server
+             */
+            zk = new ZooKeeper(zookeeperAddress, 600000, this);
+            if(credential != null) {
+              zk.addAuthInfo("digest", credential.getBytes());
+            }
+            
+            while (!this.zkCoonected) {
+                Thread.sleep(5000);
+                System.out.println("Waiting for ZK connection");
+            }
+            
+            /*
+             * Create top level directories
+             */
+            createDirectory (ZOOKEEPER_ROOT_PATH, new byte[0], true);
+            createDirectory (ZOOKEEPER_CLUSTERS_ROOT_PATH, new byte[0], true);
+            createDirectory (ZOOKEEPER_STACKS_ROOT_PATH, new byte[0], true);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    
+    public static synchronized ZookeeperDS getInstance() {
+        if(ZookeeperDSRef == null) {
+            ZookeeperDSRef = new ZookeeperDS();
+        }
+        return ZookeeperDSRef;
+    }
+
+    public Object clone() throws CloneNotSupportedException {
+        throw new CloneNotSupportedException();
+    }
+    
+    @Override
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public boolean clusterExists(String clusterName) throws IOException {
+        try {
+            if (zk.exists(ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName, false) == null) {
+                return false;
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        return true;
+    }
+    
+    @Override
+    public synchronized int storeClusterDefinition(ClusterDefinition clusterDef) throws IOException {  
+        /*
+         * Update the cluster node
+         */
+        try {
+            Stat stat = new Stat();
+            String clusterPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterDef.getName();
+            int newRev = 0;
+            String clusterRevisionPath = clusterPath+"/"+newRev;
+            String clusterLatestRevisionNumberPath = clusterPath+"/latestRevisionNumber";
+            if (zk.exists(clusterPath, false) == null) {
+                /* 
+                 * create cluster path with revision 0, create cluster latest revision node 
+                 * storing the latest revision of cluster definition.
+                 */
+                createDirectory (clusterPath, new byte[0], false);
+                createDirectory (clusterRevisionPath, JAXBUtil.write(clusterDef), false);
+                createDirectory (clusterLatestRevisionNumberPath, (new Integer(newRev)).toString().getBytes(), false);
+            }else {
+                String latestRevision = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+                newRev = Integer.parseInt(latestRevision) + 1;
+                clusterRevisionPath = clusterPath + "/" + newRev;
+                if (clusterDef.getRevision() != null) {
+                    if (!latestRevision.equals(clusterDef.getRevision())) {
+                        throw new IOException ("Latest cluster definition does not match the one client intends to modify!");
+                    }  
+                } 
+                createDirectory (clusterRevisionPath, JAXBUtil.write(clusterDef), false);
+                zk.setData(clusterLatestRevisionNumberPath, (new Integer(newRev)).toString().getBytes(), -1);
+            }
+            return newRev;
+        } catch (KeeperException e) {
+            throw new IOException (e);
+        } catch (InterruptedException e1) {
+            throw new IOException (e1);
+        }
+    }
+
+    @Override
+    public synchronized void storeClusterState(String clusterName, ClusterState clsState)
+            throws IOException {
+        /*
+         * Update the cluster state
+         */
+        try {
+            String clusterStatePath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/state";
+            if (zk.exists(clusterStatePath, false) == null) {
+                // create node for the cluster state
+                createDirectory (clusterStatePath, JAXBUtil.write(clsState), false);
+            }else {
+                zk.setData(clusterStatePath, JAXBUtil.write(clsState), -1);
+            }
+        } catch (KeeperException e) {
+            throw new IOException (e);
+        } catch (InterruptedException e1) {
+            throw new IOException (e1);
+        }
+        
+    }
+    
+    @Override
+    public ClusterDefinition retrieveClusterDefinition(String clusterName, int revision) throws IOException {
+        try {
+            Stat stat = new Stat();
+            String clusterRevisionPath;
+            if (revision < 0) {   
+                String clusterLatestRevisionNumberPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/latestRevisionNumber";
+                String latestRevisionNumber = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+                clusterRevisionPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/"+latestRevisionNumber;       
+            } else {
+                clusterRevisionPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/"+revision;
+            }
+            ClusterDefinition cdef = JAXBUtil.read(zk.getData(clusterRevisionPath, false, stat), ClusterDefinition.class); 
+            return cdef;
+        } catch (Exception e) {
+            throw new IOException (e);
+        }
+    }
+
+    @Override
+    public ClusterState retrieveClusterState(String clusterName) throws IOException {
+        try {
+            Stat stat = new Stat();
+            String clusterStatePath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/state";
+            ClusterState clsState = JAXBUtil.read(zk.getData(clusterStatePath, false, stat), ClusterState.class); 
+            return clsState;
+        } catch (Exception e) {
+            throw new IOException (e);
+        }
+    }
+    
+    @Override
+    public int retrieveLatestClusterRevisionNumber(String clusterName) throws IOException {
+        int revisionNumber;
+        try {
+            Stat stat = new Stat();
+            String clusterLatestRevisionNumberPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/latestRevisionNumber";
+            String latestRevisionNumber = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+            revisionNumber = Integer.parseInt(latestRevisionNumber);
+        } catch (Exception e) {
+            throw new IOException (e);
+        }
+        return revisionNumber;
+    }
+    
+    @Override
+    public List<String> retrieveClusterList() throws IOException {
+        try {
+            List<String> children = zk.getChildren(ZOOKEEPER_CLUSTERS_ROOT_PATH, false);
+            return children;
+        } catch (KeeperException e) {
+            throw new IOException (e);
+        } catch (InterruptedException e) {
+            throw new IOException (e);
+        }
+    }
+
+    @Override
+    public void deleteCluster(String clusterName) throws IOException {
+        String clusterPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName;
+        List<String> children;
+        try {
+            children = zk.getChildren(clusterPath, false);
+            // Delete all the children and then the parent node
+            for (String childPath : children) {
+                try {
+                    zk.delete(childPath, -1);
+                } catch (KeeperException.NoNodeException ke) {
+                } catch (Exception e) { throw new IOException (e); }
+            }
+            zk.delete(clusterPath, -1);
+        } catch (KeeperException.NoNodeException ke) {
+            return;
+        } catch (Exception e) {
+            throw new IOException (e);
+        }
+    }
+
+    @Override
+    public void purgeClusterDefinitionRevisions(String clusterName,
+            int lessThanRevision) throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void updateClusterState(String clusterName, ClusterState newstate)
+            throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public int storeStack(String stackName, Stack stack) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public Stack retrieveStack(String stackName, int revision)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public List<NameRevisionPair> retrieveStackList() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public int deleteStack(String stackName) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void deleteStackRevisions(String stackName, int lessThanRevision)
+            throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void updateComponentState(String clusterName, String componentName,
+            String state) throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public String getComponentState(String clusterName, String componentName)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void deleteComponentState(String clusterName, String componentName)
+            throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void updateRoleState(String clusterName, String componentName,
+            String roleName, String state) throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public String getRoleState(String clusterName, String componentName,
+            String RoleName) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void deleteRoleState(String clusterName, String componentName,
+            String roleName) throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (event.getType() == Event.EventType.None) {
+            // We are are being told that the state of the
+            // connection has changed
+            switch (event.getState()) {
+            case SyncConnected:
+                // In this particular example we don't need to do anything
+                // here - watches are automatically re-registered with 
+                // server and any watches triggered while the client was 
+                // disconnected will be delivered (in order of course)
+                this.zkCoonected = true;
+                break;
+            case Expired:
+                // It's all over
+                //running = false;
+                //commandHandler.stop();
+                break;
+            }
+        }
+        
+    }
+    
+    private void createDirectory(String path, byte[] initialData, boolean ignoreIfExists) throws KeeperException, InterruptedException {
+        try {
+            zk.create(path, initialData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            if(credential!=null) {
+                zk.setACL(path, Ids.CREATOR_ALL_ACL, -1);
+            }
+            System.out.println("Created path : <" + path +">");
+        } catch (KeeperException.NodeExistsException e) {
+            if (!ignoreIfExists) {
+                System.out.println("Path already exists <"+path+">");
+                throw e;
+            }
+        } catch (KeeperException.AuthFailedException e) {
+            System.out.println("Failed to authenticate for path <"+path+">");
+            throw e;
+        }
+    }
+}

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java Thu Nov  3 07:48:30 2011
@@ -109,9 +109,8 @@ public class ClusterImpl implements Clus
   private ClusterState clusterState;
   private static Log LOG = LogFactory.getLog(ClusterImpl.class);
     
-  public ClusterImpl(Cluster cluster, long revision, 
-      ClusterState clusterState) 
-      throws IOException {
+  public ClusterImpl(Cluster cluster, int revision, 
+      ClusterState clusterState) throws IOException {
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Thu Nov  3 07:48:30 2011
@@ -76,7 +76,7 @@ public class StateMachineInvoker {
   private static ConcurrentMap<String, ClusterFSM> clusters = 
       new ConcurrentHashMap<String, ClusterFSM>();
   
-  public static ClusterFSM createCluster(Cluster cluster, long revision, 
+  public static ClusterFSM createCluster(Cluster cluster, int revision, 
       ClusterState state) throws IOException {
     ClusterImpl clusterFSM = new ClusterImpl(cluster, revision, state);
     clusters.put(cluster.getName(), clusterFSM);