You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vg...@apache.org on 2011/11/03 08:48:31 UTC
svn commit: r1196984 - in /incubator/ambari/trunk: ./
client/src/main/java/org/apache/ambari/common/rest/entities/
controller/src/main/java/org/apache/ambari/controller/
controller/src/main/java/org/apache/ambari/controller/rest/config/
controller/src/...
Author: vgogate
Date: Thu Nov 3 07:48:30 2011
New Revision: 1196984
URL: http://svn.apache.org/viewvc?rev=1196984&view=rev
Log:
AMBARI-124
Added:
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java
incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Thu Nov 3 07:48:30 2011
@@ -2,6 +2,10 @@ Ambari Change log
Release 0.1.0 - unreleased
+ AMBARI-124. Add Zookeeper Data store and persist the cluster definitions across controller restart (vgogate)
+
+ AMBARI-116. Change the name group to provider in hadoop-security-0.xml stack definition (vgogate)
+
AMBARI-120. Fixed REST resource annotation bugs. (Eric Yang)
AMBARI-121. Added examples for returning REST resources. (Eric Yang)
Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterDefinition.java Thu Nov 3 07:48:30 2011
@@ -70,6 +70,17 @@ public class ClusterDefinition {
protected String name = null;
/**
+ * Every cluster update creates a new revision and returned through this field.
+ * This field can be optionally be set durint the update to latest revision
+ * (currently checked out revision) of the cluster being updated and if so,
+ * Ambari will prevent the update, if the latest revision of the cluster changed
+ * in the background before update. If not specified update will over-write current
+ * latest revision.
+ */
+ @XmlAttribute
+ protected String revision = null;
+
+ /**
* A user-facing comment about the cluster about what it is intended for.
*/
@XmlAttribute
@@ -116,6 +127,13 @@ public class ClusterDefinition {
/**
+ * @return the roleToNodesMap
+ */
+ public List<RoleToNodes> getRoleToNodesMap() {
+ return roleToNodesMap;
+ }
+
+ /**
* @return the stackRevision
*/
public String getStackRevision() {
@@ -226,4 +244,20 @@ public class ClusterDefinition {
public void setRoleToNodesMap(List<RoleToNodes> roleToNodesMap) {
this.roleToNodesMap = roleToNodesMap;
}
+
+
+ /**
+ * @return the revision
+ */
+ public String getRevision() {
+ return revision;
+ }
+
+ /**
+ * @param revision the revision to set
+ */
+ public void setRevision(String revision) {
+ this.revision = revision;
+ }
+
}
Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/entities/ClusterState.java Thu Nov 3 07:48:30 2011
@@ -17,18 +17,13 @@
*/
package org.apache.ambari.common.rest.entities;
-import java.util.Date;
-import java.util.GregorianCalendar;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSchemaType;
-import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
-
/**
* The state of a cluster.
*
@@ -106,19 +101,6 @@ public class ClusterState {
public void setCreationTime(XMLGregorianCalendar creationTime) {
this.creationTime = creationTime;
}
-
- /**
- * @param creationTime the creationTime to set
- */
- public void setCreationTime(Date creationTime) throws Exception {
- if (creationTime == null) {
- this.creationTime = null;
- } else {
- GregorianCalendar cal = new GregorianCalendar();
- cal.setTime(creationTime);
- this.creationTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
- }
- }
/**
* @return the deployTime
@@ -133,19 +115,6 @@ public class ClusterState {
public void setDeployTime(XMLGregorianCalendar deployTime) {
this.deployTime = deployTime;
}
-
- /**
- * @param creationTime the creationTime to set
- */
- public void setDeployTime(Date deployTime) throws Exception {
- if (deployTime == null) {
- this.deployTime = null;
- } else {
- GregorianCalendar cal = new GregorianCalendar();
- cal.setTime(deployTime);
- this.deployTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
- }
- }
/**
* @return the lastUpdateTime
@@ -160,19 +129,6 @@ public class ClusterState {
public void setLastUpdateTime(XMLGregorianCalendar lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
-
- /**
- * @param creationTime the creationTime to set
- */
- public void setLastUpdateTime(Date lastUpdateTime) throws Exception {
- if (lastUpdateTime == null) {
- this.lastUpdateTime = null;
- } else {
- GregorianCalendar cal = new GregorianCalendar();
- cal.setTime(lastUpdateTime);
- this.lastUpdateTime = DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
- }
- }
/**
* @return the state
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java Thu Nov 3 07:48:30 2011
@@ -18,6 +18,7 @@
package org.apache.ambari.controller;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,109 +29,141 @@ import org.apache.ambari.common.rest.ent
import org.apache.ambari.common.rest.entities.Component;
import org.apache.ambari.components.ComponentPlugin;
import org.apache.ambari.components.impl.XmlComponentDefinition;
+import org.apache.ambari.datastore.DataStoreFactory;
+import org.apache.ambari.datastore.PersistentDataStore;
public class Cluster {
/*
+ * Data Store
+ */
+ private PersistentDataStore dataStore = DataStoreFactory.getDataStore(DataStoreFactory.ZOOKEEPER_TYPE);
+
+ /*
* Latest revision of cluster definition
*/
- private long latestRevision = 0;
+ private String clusterName = null;
+ private int latestRevisionNumber = -1;
+ private ClusterDefinition latestDefinition = null;
- /**
- * @return the latestRevision
- */
- public long getLatestRevision() {
- return latestRevision;
- }
-
/*
* Map of cluster revision to cluster definition
*/
- private final Map<Long, ClusterDefinition> clusterDefinitionRevisionsList =
- new ConcurrentHashMap<Long, ClusterDefinition>();
- private ClusterState clusterState;
- private ClusterDefinition definition;
+ private final Map<Integer, ClusterDefinition> clusterDefinitionRevisionsList =
+ new ConcurrentHashMap<Integer, ClusterDefinition>();
private final Map<String, ComponentPlugin> plugins =
- new HashMap<String, ComponentPlugin>();
+ new HashMap<String, ComponentPlugin>();
+ public Cluster (String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public Cluster (ClusterDefinition c, ClusterState cs) throws Exception {
+ this.clusterName = c.getName();
+ this.updateClusterDefinition(c);
+ this.updateClusterState(cs);
+ }
+
+ public synchronized void init () throws Exception {
+ this.latestRevisionNumber = dataStore.retrieveLatestClusterRevisionNumber(clusterName);
+ this.latestDefinition = dataStore.retrieveClusterDefinition(clusterName, this.latestRevisionNumber);
+ loadPlugins(this.latestDefinition);
+ //this.clusterState = dataStore.retrieveClusterState(clusterName);
+ this.clusterDefinitionRevisionsList.put(this.latestRevisionNumber, this.latestDefinition);
+ }
+
/**
* @return the clusterDefinition
*/
- public synchronized ClusterDefinition getClusterDefinition(long revision) {
- return clusterDefinitionRevisionsList.get(revision);
+ public synchronized ClusterDefinition getClusterDefinition(int revision) throws IOException {
+ ClusterDefinition cdef = null;
+ if (revision < 0) {
+ cdef = this.latestDefinition;
+ } else {
+ if (!this.clusterDefinitionRevisionsList.containsKey(revision)) {
+ cdef = dataStore.retrieveClusterDefinition(clusterName, revision);
+ if (!this.clusterDefinitionRevisionsList.containsKey(revision)) {
+ this.clusterDefinitionRevisionsList.put(revision, cdef);
+ }
+ } else {
+ cdef = this.clusterDefinitionRevisionsList.get(revision);
+ }
+ }
+ return cdef;
}
/**
- * @return the latest clusterDefinition
+ * @return the latestRevision
*/
- public synchronized ClusterDefinition getLatestClusterDefinition() {
- return definition;
+ public int getLatestRevisionNumber() {
+ return this.latestRevisionNumber;
}
/**
* @return Add Cluster definition
*/
- public synchronized
- void addClusterDefinition(ClusterDefinition c) throws Exception {
- this.latestRevision++;
- clusterDefinitionRevisionsList.put((long)this.latestRevision, c);
- definition = c;
- // find the plugins for the current definition of the cluster
- Stacks context = Stacks.getInstance();
- Stack bp = context.getStack(c.getStackName(),
- Integer.parseInt(c.getStackRevision()));
+ public synchronized void updateClusterDefinition(ClusterDefinition c) throws Exception {
+ this.latestRevisionNumber = dataStore.storeClusterDefinition(c);
+ this.clusterDefinitionRevisionsList.put(this.latestRevisionNumber, c);
+ this.latestDefinition = c;
- while (bp != null) {
- for(Component comp: bp.getComponents()) {
- String name = comp.getName();
- if (!plugins.containsKey(name) && comp.getDefinition() != null) {
- plugins.put(name, new XmlComponentDefinition(comp.getDefinition()));
- }
- }
+ // find the plugins for the current definition of the cluster
+ loadPlugins(c);
+ }
+
+ /*
+ * Load plugins for the current definition of the cluster
+ */
+ private void loadPlugins (ClusterDefinition c) throws Exception {
- // go up to the parent
- if (bp.getParentName() != null) {
- bp = context.getStack(bp.getParentName(),
- Integer.parseInt(bp.getParentRevision()));
- } else {
- bp = null;
+ Stacks context = Stacks.getInstance();
+ Stack bp = context.getStack(c.getStackName(),
+ Integer.parseInt(c.getStackRevision()));
+
+ while (bp != null) {
+ for(Component comp: bp.getComponents()) {
+ String name = comp.getName();
+ if (!plugins.containsKey(name) && comp.getDefinition() != null) {
+ plugins.put(name, new XmlComponentDefinition(comp.getDefinition()));
+ }
+ }
+
+ // go up to the parent
+ if (bp.getParentName() != null) {
+ bp = context.getStack(bp.getParentName(),
+ Integer.parseInt(bp.getParentRevision()));
+ } else {
+ bp = null;
+ }
}
- }
}
/**
- * @return the clusterDefinitionList
- */
- public Map<Long, ClusterDefinition> getClusterDefinitionRevisionsList() {
- return clusterDefinitionRevisionsList;
- }
-
- /**
* @return the clusterState
*/
- public ClusterState getClusterState() {
- return clusterState;
+ public ClusterState getClusterState() throws IOException {
+ return dataStore.retrieveClusterState(this.clusterName);
}
/**
* @param clusterState the clusterState to set
*/
- public void setClusterState(ClusterState clusterState) {
- this.clusterState = clusterState;
+ public void updateClusterState(ClusterState clusterState) throws IOException {
+ dataStore.storeClusterState(this.clusterName, clusterState);
}
- public synchronized String getName() {
- return definition.getName();
+ public String getName() {
+ return this.latestDefinition.getName();
}
public synchronized Iterable<String> getComponents() {
- return plugins.keySet();
+ return this.plugins.keySet();
}
public synchronized
ComponentPlugin getComponentDefinition(String component) {
- return plugins.get(component);
+ return this.plugins.get(component);
}
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Clusters.java Thu Nov 3 07:48:30 2011
@@ -17,11 +17,11 @@
*/
package org.apache.ambari.controller;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.StringTokenizer;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.ws.rs.WebApplicationException;
@@ -33,19 +33,22 @@ import org.apache.ambari.common.rest.ent
import org.apache.ambari.common.rest.entities.ClusterState;
import org.apache.ambari.common.rest.entities.Node;
import org.apache.ambari.common.rest.entities.RoleToNodes;
+import org.apache.ambari.datastore.DataStoreFactory;
+import org.apache.ambari.datastore.PersistentDataStore;
import org.apache.ambari.resource.statemachine.ClusterFSM;
import org.apache.ambari.resource.statemachine.StateMachineInvoker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class Clusters {
+ // TODO: replace system.out.print by LOG
private static Log LOG = LogFactory.getLog(Clusters.class);
/*
* Operational clusters include both active and inactive clusters
*/
protected ConcurrentHashMap<String, Cluster> operational_clusters = new ConcurrentHashMap<String, Cluster>();
-
+ protected PersistentDataStore dataStore = DataStoreFactory.getDataStore(DataStoreFactory.ZOOKEEPER_TYPE);
private static Clusters ClustersTypeRef=null;
@@ -129,8 +132,12 @@ public class Clusters {
cluster124.setRoleToNodesMap(rnm);
try {
- addCluster(cluster123, false);
- addCluster(cluster124, false);
+ if (!clusterExists(cluster123.getName())) {
+ addCluster(cluster123, false);
+ }
+ if (!clusterExists(cluster124.getName())) {
+ addCluster(cluster124, false);
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -148,13 +155,56 @@ public class Clusters {
}
/*
+ * Wrapper method over datastore API
+ */
+ public boolean clusterExists(String clusterName) throws IOException {
+ int x = 0;
+ if (!this.operational_clusters.containsKey(clusterName) &&
+ dataStore.clusterExists(clusterName) == false) {
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * Get the cluster by name
+ * Wrapper over datastore API
+ */
+ public synchronized Cluster getClusterByName(String clusterName) throws Exception {
+ if (clusterExists(clusterName)) {
+ if (!this.operational_clusters.containsKey(clusterName)) {
+ Cluster cls = new Cluster(clusterName);
+ cls.init();
+ this.operational_clusters.put(clusterName, cls);
+ }
+ return this.operational_clusters.get(clusterName);
+ } else {
+ return null;
+ }
+ }
+
+ /*
+ * Purge the cluster entry from memory and the data store
+ */
+ public synchronized void purgeClusterEntry (String clusterName) throws IOException {
+ dataStore.deleteCluster(clusterName);
+ this.operational_clusters.remove(clusterName);
+ }
+
+ /*
+ * Add Cluster Entry
+ */
+ public synchronized Cluster addClusterEntry (ClusterDefinition cdef, ClusterState cs) throws Exception {
+ Cluster cls = new Cluster (cdef, cs);
+ this.operational_clusters.put(cdef.getName(), cls);
+ return cls;
+ }
+
+ /*
* Rename the cluster
*/
- public void renameCluster(String clusterName, String new_name) throws Exception {
- /*
- *
- */
- if (!this.operational_clusters.containsKey(clusterName)) {
+ public synchronized void renameCluster(String clusterName, String new_name) throws Exception {
+ if (!clusterExists(clusterName)) {
String msg = "Cluster ["+clusterName+"] does not exist";
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
}
@@ -164,155 +214,172 @@ public class Clusters {
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.BAD_REQUEST)).get());
}
- synchronized (operational_clusters) {
- /*
- * Check if cluster state is ATTAIC, If yes update the name
- * don't make new revision of cluster definition as it is in ATTIC state
- */
- if (!this.operational_clusters.get(clusterName).getClusterState().getState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
- String msg = "Cluster state is not ATTIC. Cluster is only allowed to be renamed in ATTIC state";
- throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_ACCEPTABLE)).get());
- }
-
- Cluster x = this.operational_clusters.get(clusterName);
- x.getLatestClusterDefinition().setName(new_name);
- this.operational_clusters.remove(clusterName);
- this.operational_clusters.put(new_name, x);
+ /*
+ * Check if cluster state is ATTAIC, If yes update the name
+ * don't make new revision of cluster definition as it is in ATTIC state
+ */
+ if (!getClusterByName(clusterName).getClusterState().getState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+ String msg = "Cluster state is not ATTIC. Cluster is only allowed to be renamed in ATTIC state";
+ throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_ACCEPTABLE)).get());
}
-
+
+ Cluster x = this.getClusterByName(clusterName);
+ ClusterDefinition cdef = x.getClusterDefinition(-1);
+ cdef.setName(new_name);
+ ClusterState cs = x.getClusterState();
+ this.addClusterEntry(cdef, cs);
+ this.purgeClusterEntry(clusterName);
}
/*
* Create/Update cluster definition
* TODO: As nodes or role to node association changes, validate key services nodes are not removed
*/
- public ClusterDefinition updateCluster(String clusterName, ClusterDefinition c, boolean dry_run) throws Exception {
-
+ public synchronized ClusterDefinition updateCluster(String clusterName, ClusterDefinition c, boolean dry_run) throws Exception {
/*
- * if cluster does not exist create it
+ * Add new cluster if cluster does not exist
*/
- synchronized (this.operational_clusters) {
- if (!this.operational_clusters.containsKey(clusterName)) {
- return addCluster(c, dry_run);
- }
+ if (!clusterExists(clusterName)) {
+ return addCluster(c, dry_run);
}
- Cluster cls = this.operational_clusters.get(clusterName);
/*
* Time being we will keep entire updated copy as new revision
*/
+ Cluster cls = getClusterByName(clusterName);
ClusterDefinition newcd = new ClusterDefinition ();
+ newcd.setName(clusterName);
+ if (c.getStackName() != null) {
+ newcd.setStackName(c.getStackName());
+ } else {
+ newcd.setStackName(cls.getClusterDefinition(-1).getStackName());
+ }
+ if (c.getStackRevision() != null) {
+ newcd.setStackRevision(c.getStackRevision());
+ } else {
+ newcd.setStackRevision(cls.getClusterDefinition(-1).getStackRevision());
+ }
+ if (c.getDescription() != null) {
+ newcd.setDescription(c.getDescription());
+ } else {
+ newcd.setDescription(cls.getClusterDefinition(-1).getDescription());
+ }
+ if (c.getGoalState() != null) {
+ newcd.setGoalState(c.getGoalState());
+ } else {
+ newcd.setGoalState(cls.getClusterDefinition(-1).getGoalState());
+ }
+ if (c.getActiveServices() != null) {
+ newcd.setActiveServices(c.getActiveServices());
+ } else {
+ newcd.setActiveServices(cls.getClusterDefinition(-1).getActiveServices());
+ }
- synchronized (cls.getClusterDefinitionRevisionsList()) {
- newcd.setName(clusterName);
- if (c.getStackName() != null) {
- newcd.setStackName(c.getStackName());
- } else {
- newcd.setStackName(cls.getLatestClusterDefinition().getStackName());
- }
- if (c.getStackRevision() != null) {
- newcd.setStackRevision(c.getStackRevision());
- } else {
- newcd.setStackRevision(cls.getLatestClusterDefinition().getStackRevision());
- }
- if (c.getDescription() != null) {
- newcd.setDescription(c.getDescription());
- } else {
- newcd.setDescription(cls.getLatestClusterDefinition().getDescription());
- }
- if (c.getGoalState() != null) {
- newcd.setGoalState(c.getGoalState());
- } else {
- newcd.setGoalState(cls.getLatestClusterDefinition().getGoalState());
- }
- if (c.getActiveServices() != null) {
- newcd.setActiveServices(c.getActiveServices());
- } else {
- newcd.setActiveServices(cls.getLatestClusterDefinition().getActiveServices());
- }
-
- /*
- * TODO: What if controller is crashed after updateClusterNodesReservation
- * before updating and adding new revision of cluster definition?
- */
- boolean updateNodesReservation = false;
- boolean updateNodeToRolesAssociation = false;
- if (c.getNodes() != null) {
- newcd.setNodes(c.getNodes());
- updateNodesReservation = true;
-
- } else {
- newcd.setNodes(cls.getLatestClusterDefinition().getNodes());
- }
- if (c.getRoleToNodes() != null) {
- newcd.setRoleToNodesMap(c.getRoleToNodes());
- updateNodeToRolesAssociation = true;
-
- }
-
- /*
- * if Cluster goal state is ATTIC then no need to take any action other than
- * updating the cluster definition.
- */
- if (newcd.getGoalState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
- cls.getClusterState().setLastUpdateTime(new Date());
- cls.addClusterDefinition(newcd);
- /*
- * TODO: Persist the latest cluster definition under new revision
- */
- return cls.getLatestClusterDefinition();
- }
-
- /*
- * Validate the updated cluster definition
- */
- validateClusterDefinition(newcd);
-
- /*
- * TODO: If dry_run then return the newcd at this point
- */
- if (dry_run) {
- System.out.println ("Dry run for update cluster..");
- return newcd;
- }
-
- /*
- * Update the nodes reservation and node to roles association
- */
- if (updateNodesReservation) {
- updateClusterNodesReservation (cls.getName(), c);
- }
- if (updateNodeToRolesAssociation) {
- updateNodeToRolesAssociation(newcd.getNodes(), c.getRoleToNodes());
- }
-
- /*
- * Update the last update time & revision
- */
- cls.getClusterState().setLastUpdateTime(new Date());
- cls.addClusterDefinition(newcd);
+ /*
+ * TODO: What if controller is crashed after updateClusterNodesReservation
+ * before updating and adding new revision of cluster definition?
+ */
+ boolean updateNodesReservation = false;
+ boolean updateNodeToRolesAssociation = false;
+ if (c.getNodes() != null) {
+ newcd.setNodes(c.getNodes());
+ updateNodesReservation = true;
- /*
- * TODO: Persist the latest cluster definition under new revision
- */
+ } else {
+ newcd.setNodes(cls.getClusterDefinition(-1).getNodes());
+ }
+ if (c.getRoleToNodes() != null) {
+ newcd.setRoleToNodesMap(c.getRoleToNodes());
+ updateNodeToRolesAssociation = true;
- /*
- * Invoke state machine event
- */
- ClusterFSM clusterFSM = StateMachineInvoker.
- getStateMachineClusterInstance(cls.getName());
- if(c.getGoalState().equals(ClusterState.CLUSTER_STATE_ACTIVE)) {
- clusterFSM.activate();
- } else if(c.getGoalState().
- equals(ClusterState.CLUSTER_STATE_INACTIVE)) {
- clusterFSM.deactivate();
- } else if(c.getGoalState().
- equals(ClusterState.CLUSTER_STATE_ATTIC)) {
- clusterFSM.deactivate();
- clusterFSM.terminate();
- }
+ }
+
+ /*
+ * if Cluster goal state is ATTIC then no need to take any action other than
+ * updating the cluster definition.
+ */
+ if (newcd.getGoalState().equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+ ClusterState cs = cls.getClusterState();
+ cs.setLastUpdateTime(Util.getXMLGregorianCalendar(new Date()));
+ cls.updateClusterDefinition(newcd);
+ cls.updateClusterState(cs);
+ return cls.getClusterDefinition(-1);
+ }
+
+ /*
+ * Validate the updated cluster definition
+ */
+ validateClusterDefinition(newcd);
+
+ /*
+ * If dry_run then return the newcd at this point
+ */
+ if (dry_run) {
+ System.out.println ("Dry run for update cluster..");
+ return newcd;
+ }
+
+ /*
+ * Udate the new cluster definition
+ */
+ ClusterState cs = cls.getClusterState();
+ cs.setLastUpdateTime(Util.getXMLGregorianCalendar(new Date()));
+ cls.updateClusterDefinition(newcd);
+ cls.updateClusterState(cs);
+
+ /*
+ * Update the nodes reservation and node to roles association
+ */
+ if (updateNodesReservation) {
+ updateClusterNodesReservation (cls.getName(), c);
+ }
+ if (updateNodeToRolesAssociation) {
+ updateNodeToRolesAssociation(newcd.getNodes(), c.getRoleToNodes());
+ }
+
+ /*
+ * Invoke state machine event
+ */
+ ClusterFSM clusterFSM = StateMachineInvoker.
+ getStateMachineClusterInstance(cls.getName());
+ if(c.getGoalState().equals(ClusterState.CLUSTER_STATE_ACTIVE)) {
+ clusterFSM.activate();
+ } else if(c.getGoalState().
+ equals(ClusterState.CLUSTER_STATE_INACTIVE)) {
+ clusterFSM.deactivate();
+ } else if(c.getGoalState().
+ equals(ClusterState.CLUSTER_STATE_ATTIC)) {
+ clusterFSM.deactivate();
+ clusterFSM.terminate();
}
- return cls.getLatestClusterDefinition();
+
+ return cls.getClusterDefinition(-1);
+ }
+
+ /*
+ * Add default values for new cluster definition
+ */
+ private void setNewClusterDefaults(ClusterDefinition cdef) throws Exception {
+ /*
+ * Populate the input cluster definition w/ default values
+ */
+ if (cdef.getDescription() == null) { cdef.setDescription("Ambari cluster : "+cdef.getName());
+ }
+ if (cdef.getGoalState() == null) { cdef.setGoalState(ClusterDefinition.GOAL_STATE_INACTIVE);
+ }
+
+ /*
+ * If its new cluster, do not specify the revision, set it to null. A revision number is obtained
+ * after persisting the definition
+ */
+ cdef.setRevision(null);
+
+ // TODO: Add the list of active services by querying pluging component.
+ if (cdef.getActiveServices() == null) {
+ List<String> services = new ArrayList<String>();
+ services.add("ALL");
+ cdef.setActiveServices(services);
+ }
}
/*
@@ -329,36 +396,32 @@ public class Clusters {
* are in UNREGISTERED state).
*/
private ClusterDefinition addCluster(ClusterDefinition cdef, boolean dry_run) throws Exception {
-
+
/*
* TODO: Validate the cluster definition and set the default
+ *
*/
validateClusterDefinition(cdef);
- /*
- * Check if cluster already exist
+ /*
+ * Add the defaults for optional values, if not set
*/
- if (operational_clusters.containsKey(cdef.getName())) {
- String msg = "Cluster ["+cdef.getName()+"] already exists";
- throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.CONFLICT)).get());
- }
-
- /*
+ setNewClusterDefaults(cdef);
+
+ /*
* Create new cluster object
*/
Date requestTime = new Date();
- Cluster cls = new Cluster();
+
ClusterState clsState = new ClusterState();
- clsState.setCreationTime(requestTime);
- clsState.setLastUpdateTime(requestTime);
- clsState.setDeployTime((Date)null);
+ clsState.setCreationTime(Util.getXMLGregorianCalendar(requestTime));
+ clsState.setLastUpdateTime(Util.getXMLGregorianCalendar(requestTime));
+ clsState.setDeployTime(Util.getXMLGregorianCalendar((Date)null));
if (cdef.getGoalState().equals(ClusterDefinition.GOAL_STATE_ATTIC)) {
clsState.setState(ClusterState.CLUSTER_STATE_ATTIC);
} else {
clsState.setState(ClusterDefinition.GOAL_STATE_INACTIVE);
}
- cls.addClusterDefinition(cdef);
- cls.setClusterState(clsState);
/*
* If dry run then update roles to nodes map, if not specified explicitly
@@ -371,6 +434,13 @@ public class Clusters {
}
/*
+ * Persist the new cluster and add entry to cache
+ * TODO: Persist reserved nodes against the cluster & service/role?
+ *
+ */
+ Cluster cls = this.addClusterEntry(cdef, clsState);
+
+ /*
* Update cluster nodes reservation.
*/
if (cdef.getNodes() != null
@@ -396,20 +466,12 @@ public class Clusters {
}
/*
- * TODO: Persist the cluster definition to data store as a initial version r0.
- * Persist reserved nodes against the cluster & service/role
- */
-
- // Add the cluster to the list, after definition is persisted
- this.operational_clusters.put(cdef.getName(), cls);
-
- /*
* Activate the cluster if the goal state is ACTIVE
* TODO: What to do if activate fails ???
*/
if(cdef.getGoalState().equals(ClusterDefinition.GOAL_STATE_ACTIVE)) {
org.apache.ambari.resource.statemachine.ClusterFSM cs =
- StateMachineInvoker.createCluster(cls,cls.getLatestRevision(),
+ StateMachineInvoker.createCluster(cls,cls.getLatestRevisionNumber(),
cls.getClusterState());
cs.activate();
}
@@ -426,7 +488,7 @@ public class Clusters {
}
/*
- * Validates the cluster definition
+ * Validate the cluster definition
* TODO: Validate each role has enough nodes associated with it.
*/
private void validateClusterDefinition (ClusterDefinition cdef) throws Exception {
@@ -469,20 +531,6 @@ public class Clusters {
}
}
- /*
- * Populate the input cluster definition w/ default values
- */
- if (cdef.getDescription() == null) { cdef.setDescription("Ambari cluster : "+cdef.getName());
- }
- if (cdef.getGoalState() == null) { cdef.setGoalState(cdef.GOAL_STATE_INACTIVE);
- }
-
- // TODO: Add the list of active services by querying pluging component.
- if (cdef.getActiveServices() == null) {
- List<String> services = new ArrayList<String>();
- services.add("ALL");
- cdef.setActiveServices(services);
- }
/*
* Check if all the nodes explicitly specified in the RoleToNodesMap belong the cluster node range specified
@@ -517,7 +565,7 @@ public class Clusters {
/*
* Reserve the nodes as specified in the node range expressions
- * -- throw exception if any nodes are pre-associated with other cluster
+ * -- throw exception, if any nodes are pre-associated with other cluster
*/
List<String> nodes_currently_allocated_to_cluster = new ArrayList<String>();
for (Node n : Nodes.getInstance().getNodes().values()) {
@@ -593,7 +641,7 @@ public class Clusters {
}
/*
- * This function disassociate the node from the cluster. The clsuterID associated w/
+ * This function disassociate all the nodes from the cluster. The clsuterID associated w/
* cluster will be reset by heart beat when node reports all clean.
*/
public synchronized void releaseClusterNodes (String clusterName) throws Exception {
@@ -644,14 +692,14 @@ public class Clusters {
* Get Cluster stack
*/
public Stack getClusterStack(String clusterName, boolean expanded) throws Exception {
- if (!this.operational_clusters.containsKey(clusterName)) {
+ if (!this.clusterExists(clusterName)) {
String msg = "Cluster ["+clusterName+"] does not exist";
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
}
- Cluster cls = this.operational_clusters.get(clusterName);
- String stackName = cls.getLatestClusterDefinition().getStackName();
- int stackRevision = Integer.parseInt(cls.getLatestClusterDefinition().getStackRevision());
+ Cluster cls = this.getClusterByName(clusterName);
+ String stackName = cls.getClusterDefinition(-1).getStackName();
+ int stackRevision = Integer.parseInt(cls.getClusterDefinition(-1).getStackRevision());
Stack bp;
if (!expanded) {
@@ -663,70 +711,61 @@ public class Clusters {
return bp;
}
-
/*
* Delete Cluster
- * Delete operation will bring the cluster to ATTIC state and then remove the
- * cluster definition from the controller
- * When cluster state transitions to ATTIC, it should check if the cluster definition is
- * part of tobe_deleted_clusters map and then delete the definition.
- * TODO: Delete definition from both operational_clusters and operational_clusters_id_name map and to_be_deleted
- * clusters list.
- */
- public void deleteCluster(String clusterName) throws Exception {
- synchronized (this.operational_clusters) {
- for (Cluster cls : this.operational_clusters.values()) {
- if (cls.getLatestClusterDefinition().getName().equals(clusterName)) {
- synchronized (cls) {
- ClusterDefinition cdf = new ClusterDefinition();
- cdf.setName(clusterName);
- cdf.setGoalState(ClusterState.CLUSTER_STATE_ATTIC);
- updateCluster(clusterName, cdf, false);
- /* Update cluster state, mark it "to be deleted" when gets to ATTIC state
- * TODO: PERSIST the new flag in the cluster state
- */
- cls.getClusterState().setMarkForDeletionWhenInAttic(true);
-
- }
- }
- }
- }
- }
-
- /*
- * Get the cluster by name
+ * Delete operation will mark the cluster to_be_deleted and then set the goal state to ATTIC
+ * Once cluster gets to ATTIC state, background daemon should purge the cluster entry.
*/
- public Cluster getClusterByName(String clusterName) {
- return this.operational_clusters.get(clusterName);
- }
-
+ public synchronized void deleteCluster(String clusterName) throws Exception {
+
+ if (!this.clusterExists(clusterName)) {
+ System.out.println("Cluster ["+clusterName+"] does not exist!");
+ return;
+ }
+
+ /*
+ * Update the cluster definition with goal state to be ATTIC
+ */
+ Cluster cls = this.getClusterByName(clusterName);
+ ClusterDefinition cdf = new ClusterDefinition();
+ cdf.setName(clusterName);
+ cdf.setGoalState(ClusterState.CLUSTER_STATE_ATTIC);
+ cls.updateClusterDefinition(cdf);
+
+ /*
+ * Update cluster state, mark it "to be deleted"
+ */
+ ClusterState cs = cls.getClusterState();
+ cs.setMarkForDeletionWhenInAttic(true);
+ cls.updateClusterState(cs);
+ }
/*
* Get the latest cluster definition
*/
- public ClusterDefinition getLatestClusterDefinition(String clusterName) {
- return this.operational_clusters.get(clusterName).getLatestClusterDefinition();
+ public ClusterDefinition getLatestClusterDefinition(String clusterName) throws Exception {
+ return this.getClusterByName(clusterName).getClusterDefinition(-1);
}
/*
* Get Cluster Definition given name and revision
*/
- public ClusterDefinition getClusterDefinition(String clusterName, long revision) {
- return this.operational_clusters.get(clusterName).getClusterDefinition(revision);
+ public ClusterDefinition getClusterDefinition(String clusterName, int revision) throws Exception {
+ return this.getClusterByName(clusterName).getClusterDefinition(revision);
}
/*
* Get the cluster Information by name
*/
public ClusterInformation getClusterInformation (String clusterName) throws Exception {
- if (!this.operational_clusters.containsKey(clusterName)) {
+ if (!this.clusterExists(clusterName)) {
String msg = "Cluster ["+clusterName+"] does not exist";
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
}
ClusterInformation clsInfo = new ClusterInformation();
clsInfo.setDefinition(this.getLatestClusterDefinition(clusterName));
- clsInfo.setState(this.operational_clusters.get(clusterName).getClusterState());
+ clsInfo.setState(this.getClusterByName(clusterName).getClusterState());
return clsInfo;
}
@@ -734,30 +773,32 @@ public class Clusters {
/*
* Get the cluster state
*/
- public ClusterState getClusterState(String clusterName) throws WebApplicationException {
- if (!this.operational_clusters.containsKey(clusterName)) {
+ public ClusterState getClusterState(String clusterName) throws Exception {
+ if (!this.clusterExists(clusterName)) {
String msg = "Cluster ["+clusterName+"] does not exist";
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NOT_FOUND)).get());
}
- return this.operational_clusters.get(clusterName).getClusterState();
+ return this.getClusterByName(clusterName).getClusterState();
}
/*
* Get Cluster Information list i.e. cluster definition and cluster state
*/
- public List<ClusterInformation> getClusterInformationList(String state) {
+ public List<ClusterInformation> getClusterInformationList(String state) throws Exception {
List<ClusterInformation> list = new ArrayList<ClusterInformation>();
- for (Cluster cls : this.operational_clusters.values()) {
+ List<String> clusterNames = dataStore.retrieveClusterList();
+ for (String clsName : clusterNames) {
+ Cluster cls = this.getClusterByName(clsName);
if (state.equals("ALL")) {
ClusterInformation clsInfo = new ClusterInformation();
- clsInfo.setDefinition(cls.getLatestClusterDefinition());
+ clsInfo.setDefinition(cls.getClusterDefinition(-1));
clsInfo.setState(cls.getClusterState());
list.add(clsInfo);
} else {
if (cls.getClusterState().getState().equals(state)) {
ClusterInformation clsInfo = new ClusterInformation();
- clsInfo.setDefinition(cls.getLatestClusterDefinition());
+ clsInfo.setDefinition(cls.getClusterDefinition(-1));
clsInfo.setState(cls.getClusterState());
list.add(clsInfo);
}
@@ -770,14 +811,16 @@ public class Clusters {
* Get the list of clusters
* TODO: Get the synchronized snapshot of each cluster definition?
*/
- public List<Cluster> getClustersList(String state) {
+ public List<Cluster> getClustersList(String state) throws Exception {
List<Cluster> list = new ArrayList<Cluster>();
- if (state.equals("ALL")) {
- list.addAll(this.operational_clusters.values());
- } else {
- for (Cluster cls : this.operational_clusters.values()) {
+ List<String> clusterNames = dataStore.retrieveClusterList();
+ for (String clsName : clusterNames) {
+ Cluster cls = this.getClusterByName(clsName);
+ if (state.equals("ALL")) {
+ list.add(cls);
+ } else {
if (cls.getClusterState().getState().equals(state)) {
- list.add(cls);
+ list.add(cls);
}
}
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Thu Nov 3 07:48:30 2011
@@ -89,7 +89,7 @@ public class HeartbeatHandler {
.getNodeState().getClusterName();
if (clusterName != null) {
clusterRev = Clusters.getInstance().
- getClusterByName(clusterName).getLatestRevision();
+ getClusterByName(clusterName).getLatestRevisionNumber();
}
ComponentAndRoleStates componentStates =
@@ -576,19 +576,23 @@ public class HeartbeatHandler {
private void inspectAgentState(HeartBeat heartbeat,
ComponentAndRoleStates componentServers)
throws IOException {
- List<AgentRoleState> agentRoleStates =
- heartbeat.getInstalledRoleStates();
- if (agentRoleStates == null) {
- return;
- }
- List<Cluster> clustersNodeBelongsTo = new ArrayList<Cluster>();
- for (AgentRoleState agentRoleState : agentRoleStates) {
- componentServers.recordRoleState(heartbeat.getHostname(),agentRoleState);
- Cluster c = Clusters.getInstance().
- getClusterByName(agentRoleState.getClusterId());
- clustersNodeBelongsTo.add(c);
- }
- checkActionResults(heartbeat, componentServers);
+ try {
+ List<AgentRoleState> agentRoleStates =
+ heartbeat.getInstalledRoleStates();
+ if (agentRoleStates == null) {
+ return;
+ }
+ List<Cluster> clustersNodeBelongsTo = new ArrayList<Cluster>();
+ for (AgentRoleState agentRoleState : agentRoleStates) {
+ componentServers.recordRoleState(heartbeat.getHostname(),agentRoleState);
+ Cluster c = Clusters.getInstance().
+ getClusterByName(agentRoleState.getClusterId());
+ clustersNodeBelongsTo.add(c);
+ }
+ checkActionResults(heartbeat, componentServers);
+ } catch (Exception e) {
+ throw new IOException (e);
+ }
}
private void checkActionResults(HeartBeat heartbeat,
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java Thu Nov 3 07:48:30 2011
@@ -85,7 +85,7 @@ public class Nodes {
public List<Node> getClusterNodes (String clusterName, String roleName, String alive) throws Exception {
List<Node> list = new ArrayList<Node>();
- ClusterDefinition c = Clusters.getInstance().operational_clusters.get(clusterName).getLatestClusterDefinition();
+ ClusterDefinition c = Clusters.getInstance().operational_clusters.get(clusterName).getClusterDefinition(-1);
if (c.getNodes() == null || c.getNodes().equals("") || Clusters.getInstance().getClusterByName(clusterName).getClusterState().getState().equalsIgnoreCase("ATTIC")) {
String msg = "No nodes are reserved for the cluster. Typically cluster in ATTIC state does not have any nodes reserved";
throw new WebApplicationException((new ExceptionResponse(msg, Response.Status.NO_CONTENT)).get());
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Stacks.java Thu Nov 3 07:48:30 2011
@@ -293,8 +293,8 @@ public class Stacks {
public Hashtable<String, String> getClusterReferencedStacksList() throws Exception {
Hashtable<String, String> clusterStacks = new Hashtable<String, String>();
for (Cluster c : Clusters.getInstance().operational_clusters.values()) {
- String cBPName = c.getLatestClusterDefinition().getStackName();
- String cBPRevision = c.getLatestClusterDefinition().getStackRevision();
+ String cBPName = c.getClusterDefinition(-1).getStackName();
+ String cBPRevision = c.getClusterDefinition(-1).getStackRevision();
Stack bpx = this.getStack(cBPName, Integer.parseInt(cBPRevision));
clusterStacks.put(cBPName+"-"+cBPRevision, "");
while (bpx.getParentName() != null) {
Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java Thu Nov 3 07:48:30 2011
@@ -0,0 +1,20 @@
+package org.apache.ambari.controller;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+
+public class Util {
+
+ public static XMLGregorianCalendar getXMLGregorianCalendar (Date date) throws Exception {
+ if (date == null) {
+ return null;
+ }
+ GregorianCalendar cal = new GregorianCalendar();
+ cal.setTime(date);
+ return DatatypeFactory.newInstance().newXMLGregorianCalendar(cal);
+ }
+
+}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/rest/config/Examples.java Thu Nov 3 07:48:30 2011
@@ -29,6 +29,7 @@ import org.apache.ambari.common.rest.ent
import org.apache.ambari.common.rest.entities.NodeState;
import org.apache.ambari.common.rest.entities.RoleToNodes;
import org.apache.ambari.common.rest.entities.Stack;
+import org.apache.ambari.controller.Util;
import org.apache.ambari.common.rest.entities.StackInformation;
public class Examples {
@@ -77,8 +78,8 @@ public class Examples {
CLUSTER_STATE.setState("ATTIC");
try {
- CLUSTER_STATE.setCreationTime(new Date());
- CLUSTER_STATE.setDeployTime(new Date());
+ CLUSTER_STATE.setCreationTime(Util.getXMLGregorianCalendar(new Date()));
+ CLUSTER_STATE.setDeployTime(Util.getXMLGregorianCalendar(new Date()));
} catch (Exception e) {
}
NODE.setName("localhost");
Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/DataStoreFactory.java Thu Nov 3 07:48:30 2011
@@ -0,0 +1,24 @@
+package org.apache.ambari.datastore;
+
+import java.io.IOException;
+
+import org.apache.ambari.datastore.impl.ZookeeperDS;
+
+public class DataStoreFactory {
+
+ public static String ZOOKEEPER_TYPE = "zookeeper";
+
+ public static PersistentDataStore getDataStore(String storeType) {
+ if (storeType.equalsIgnoreCase(ZOOKEEPER_TYPE)) {
+ return ZookeeperDS.getInstance();
+ }
+ return null;
+ }
+
+ public static void main (String args) {
+ try {
+ PersistentDataStore ds = DataStoreFactory.getDataStore(ZOOKEEPER_TYPE);
+ } catch (Exception e) {
+ }
+ }
+}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/PersistentDataStore.java Thu Nov 3 07:48:30 2011
@@ -3,9 +3,9 @@ package org.apache.ambari.datastore;
import java.io.IOException;
import java.util.List;
+import org.apache.ambari.common.rest.entities.ClusterState;
import org.apache.ambari.common.rest.entities.Stack;
import org.apache.ambari.common.rest.entities.ClusterDefinition;
-import org.apache.ambari.common.rest.entities.ClusterState;
public interface PersistentDataStore {
@@ -16,10 +16,30 @@ public interface PersistentDataStore {
public void close () throws IOException;
/**
- * Persist the cluster definition.
- *
- * Create new cluster entry, if one does not exist already else add new revision to existing cluster
- * Return the revision number for each newly added cluster definition
+ * Check if cluster exists
+ */
+ public boolean clusterExists(String clusterName) throws IOException;
+
+ /**
+ * Get Latest cluster Revision Number
+ */
+ public int retrieveLatestClusterRevisionNumber(String clusterName) throws IOException;
+
+ /**
+ * Store the cluster state
+ */
+ public void storeClusterState (String clusterName, ClusterState clsState) throws IOException;
+
+ /**
+ * Store the cluster state
+ */
+ public ClusterState retrieveClusterState (String clusterName) throws IOException;
+
+ /**
+ * Store the cluster definition.
+ *
+ * Return the revision number for new or updated cluster definition
+ * If cluster revision is not null then, check if existing revision being updated in the store is same.
*/
public int storeClusterDefinition (ClusterDefinition clusterDef) throws IOException;
@@ -39,10 +59,9 @@ public interface PersistentDataStore {
}
/**
- * Retrieve all cluster definitions with their latest revisions
- *
+ * Retrieve list of existing cluster names
*/
- public List<NameRevisionPair> retrieveClusterList () throws IOException;
+ public List<String> retrieveClusterList () throws IOException;
/**
Added: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java?rev=1196984&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java (added)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/datastore/impl/ZookeeperDS.java Thu Nov 3 07:48:30 2011
@@ -0,0 +1,359 @@
+package org.apache.ambari.datastore.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.common.rest.entities.ClusterDefinition;
+import org.apache.ambari.common.rest.entities.ClusterState;
+import org.apache.ambari.common.rest.entities.Stack;
+import org.apache.ambari.common.util.JAXBUtil;
+import org.apache.ambari.controller.Stacks;
+import org.apache.ambari.datastore.PersistentDataStore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZookeeperDS implements PersistentDataStore, Watcher {
+
+ private static final String DEFAULT_ZOOKEEPER_ADDRESS="localhost:2181";
+ private static final String ZOOKEEPER_ROOT_PATH="/ambari";
+ private static final String ZOOKEEPER_CLUSTERS_ROOT_PATH=ZOOKEEPER_ROOT_PATH+"/clusters";
+ private static final String ZOOKEEPER_STACKS_ROOT_PATH=ZOOKEEPER_ROOT_PATH+"/stacks";
+
+ private ZooKeeper zk;
+ private String credential = null;
+ private boolean zkCoonected = false;
+
+ private static ZookeeperDS ZookeeperDSRef=null;
+ private ZookeeperDS() {
+ /*
+ * TODO: Read ZooKeeper address and credential from config file
+ */
+ String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS;
+ try {
+ /*
+ * Connect to ZooKeeper server
+ */
+ zk = new ZooKeeper(zookeeperAddress, 600000, this);
+ if(credential != null) {
+ zk.addAuthInfo("digest", credential.getBytes());
+ }
+
+ while (!this.zkCoonected) {
+ Thread.sleep(5000);
+ System.out.println("Waiting for ZK connection");
+ }
+
+ /*
+ * Create top level directories
+ */
+ createDirectory (ZOOKEEPER_ROOT_PATH, new byte[0], true);
+ createDirectory (ZOOKEEPER_CLUSTERS_ROOT_PATH, new byte[0], true);
+ createDirectory (ZOOKEEPER_STACKS_ROOT_PATH, new byte[0], true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static synchronized ZookeeperDS getInstance() {
+ if(ZookeeperDSRef == null) {
+ ZookeeperDSRef = new ZookeeperDS();
+ }
+ return ZookeeperDSRef;
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ throw new CloneNotSupportedException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean clusterExists(String clusterName) throws IOException {
+ try {
+ if (zk.exists(ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName, false) == null) {
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized int storeClusterDefinition(ClusterDefinition clusterDef) throws IOException {
+ /*
+ * Update the cluster node
+ */
+ try {
+ Stat stat = new Stat();
+ String clusterPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterDef.getName();
+ int newRev = 0;
+ String clusterRevisionPath = clusterPath+"/"+newRev;
+ String clusterLatestRevisionNumberPath = clusterPath+"/latestRevisionNumber";
+ if (zk.exists(clusterPath, false) == null) {
+ /*
+ * create cluster path with revision 0, create cluster latest revision node
+ * storing the latest revision of cluster definition.
+ */
+ createDirectory (clusterPath, new byte[0], false);
+ createDirectory (clusterRevisionPath, JAXBUtil.write(clusterDef), false);
+ createDirectory (clusterLatestRevisionNumberPath, (new Integer(newRev)).toString().getBytes(), false);
+ }else {
+ String latestRevision = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+ newRev = Integer.parseInt(latestRevision) + 1;
+ clusterRevisionPath = clusterPath + "/" + newRev;
+ if (clusterDef.getRevision() != null) {
+ if (!latestRevision.equals(clusterDef.getRevision())) {
+ throw new IOException ("Latest cluster definition does not match the one client intends to modify!");
+ }
+ }
+ createDirectory (clusterRevisionPath, JAXBUtil.write(clusterDef), false);
+ zk.setData(clusterLatestRevisionNumberPath, (new Integer(newRev)).toString().getBytes(), -1);
+ }
+ return newRev;
+ } catch (KeeperException e) {
+ throw new IOException (e);
+ } catch (InterruptedException e1) {
+ throw new IOException (e1);
+ }
+ }
+
+ @Override
+ public synchronized void storeClusterState(String clusterName, ClusterState clsState)
+ throws IOException {
+ /*
+ * Update the cluster state
+ */
+ try {
+ String clusterStatePath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/state";
+ if (zk.exists(clusterStatePath, false) == null) {
+ // create node for the cluster state
+ createDirectory (clusterStatePath, JAXBUtil.write(clsState), false);
+ }else {
+ zk.setData(clusterStatePath, JAXBUtil.write(clsState), -1);
+ }
+ } catch (KeeperException e) {
+ throw new IOException (e);
+ } catch (InterruptedException e1) {
+ throw new IOException (e1);
+ }
+
+ }
+
+ @Override
+ public ClusterDefinition retrieveClusterDefinition(String clusterName, int revision) throws IOException {
+ try {
+ Stat stat = new Stat();
+ String clusterRevisionPath;
+ if (revision < 0) {
+ String clusterLatestRevisionNumberPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/latestRevisionNumber";
+ String latestRevisionNumber = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+ clusterRevisionPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/"+latestRevisionNumber;
+ } else {
+ clusterRevisionPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/"+revision;
+ }
+ ClusterDefinition cdef = JAXBUtil.read(zk.getData(clusterRevisionPath, false, stat), ClusterDefinition.class);
+ return cdef;
+ } catch (Exception e) {
+ throw new IOException (e);
+ }
+ }
+
+ @Override
+ public ClusterState retrieveClusterState(String clusterName) throws IOException {
+ try {
+ Stat stat = new Stat();
+ String clusterStatePath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/state";
+ ClusterState clsState = JAXBUtil.read(zk.getData(clusterStatePath, false, stat), ClusterState.class);
+ return clsState;
+ } catch (Exception e) {
+ throw new IOException (e);
+ }
+ }
+
+ @Override
+ public int retrieveLatestClusterRevisionNumber(String clusterName) throws IOException {
+ int revisionNumber;
+ try {
+ Stat stat = new Stat();
+ String clusterLatestRevisionNumberPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName+"/latestRevisionNumber";
+ String latestRevisionNumber = new String (zk.getData(clusterLatestRevisionNumberPath, false, stat));
+ revisionNumber = Integer.parseInt(latestRevisionNumber);
+ } catch (Exception e) {
+ throw new IOException (e);
+ }
+ return revisionNumber;
+ }
+
+ @Override
+ public List<String> retrieveClusterList() throws IOException {
+ try {
+ List<String> children = zk.getChildren(ZOOKEEPER_CLUSTERS_ROOT_PATH, false);
+ return children;
+ } catch (KeeperException e) {
+ throw new IOException (e);
+ } catch (InterruptedException e) {
+ throw new IOException (e);
+ }
+ }
+
+ @Override
+ public void deleteCluster(String clusterName) throws IOException {
+ String clusterPath = ZOOKEEPER_CLUSTERS_ROOT_PATH+"/"+clusterName;
+ List<String> children;
+ try {
+ children = zk.getChildren(clusterPath, false);
+ // Delete all the children and then the parent node
+ for (String childPath : children) {
+ try {
+ zk.delete(childPath, -1);
+ } catch (KeeperException.NoNodeException ke) {
+ } catch (Exception e) { throw new IOException (e); }
+ }
+ zk.delete(clusterPath, -1);
+ } catch (KeeperException.NoNodeException ke) {
+ return;
+ } catch (Exception e) {
+ throw new IOException (e);
+ }
+ }
+
+ @Override
+ public void purgeClusterDefinitionRevisions(String clusterName,
+ int lessThanRevision) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void updateClusterState(String clusterName, ClusterState newstate)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public int storeStack(String stackName, Stack stack) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Stack retrieveStack(String stackName, int revision)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<NameRevisionPair> retrieveStackList() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int deleteStack(String stackName) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void deleteStackRevisions(String stackName, int lessThanRevision)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void updateComponentState(String clusterName, String componentName,
+ String state) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getComponentState(String clusterName, String componentName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void deleteComponentState(String clusterName, String componentName)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void updateRoleState(String clusterName, String componentName,
+ String roleName, String state) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getRoleState(String clusterName, String componentName,
+ String RoleName) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void deleteRoleState(String clusterName, String componentName,
+ String roleName) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == Event.EventType.None) {
+ // We are are being told that the state of the
+ // connection has changed
+ switch (event.getState()) {
+ case SyncConnected:
+ // In this particular example we don't need to do anything
+ // here - watches are automatically re-registered with
+ // server and any watches triggered while the client was
+ // disconnected will be delivered (in order of course)
+ this.zkCoonected = true;
+ break;
+ case Expired:
+ // It's all over
+ //running = false;
+ //commandHandler.stop();
+ break;
+ }
+ }
+
+ }
+
+ private void createDirectory(String path, byte[] initialData, boolean ignoreIfExists) throws KeeperException, InterruptedException {
+ try {
+ zk.create(path, initialData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ if(credential!=null) {
+ zk.setACL(path, Ids.CREATOR_ALL_ACL, -1);
+ }
+ System.out.println("Created path : <" + path +">");
+ } catch (KeeperException.NodeExistsException e) {
+ if (!ignoreIfExists) {
+ System.out.println("Path already exists <"+path+">");
+ throw e;
+ }
+ } catch (KeeperException.AuthFailedException e) {
+ System.out.println("Failed to authenticate for path <"+path+">");
+ throw e;
+ }
+ }
+}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java Thu Nov 3 07:48:30 2011
@@ -109,9 +109,8 @@ public class ClusterImpl implements Clus
private ClusterState clusterState;
private static Log LOG = LogFactory.getLog(ClusterImpl.class);
- public ClusterImpl(Cluster cluster, long revision,
- ClusterState clusterState)
- throws IOException {
+ public ClusterImpl(Cluster cluster, int revision,
+ ClusterState clusterState) throws IOException {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1196984&r1=1196983&r2=1196984&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Thu Nov 3 07:48:30 2011
@@ -76,7 +76,7 @@ public class StateMachineInvoker {
private static ConcurrentMap<String, ClusterFSM> clusters =
new ConcurrentHashMap<String, ClusterFSM>();
- public static ClusterFSM createCluster(Cluster cluster, long revision,
+ public static ClusterFSM createCluster(Cluster cluster, int revision,
ClusterState state) throws IOException {
ClusterImpl clusterFSM = new ClusterImpl(cluster, revision, state);
clusters.put(cluster.getName(), clusterFSM);