You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2012/11/07 09:13:24 UTC
svn commit: r1406489 [9/19] - in /incubator/ambari/branches/AMBARI-666: ./
ambari-agent/ ambari-agent/conf/ ambari-agent/conf/unix/
ambari-agent/src/main/puppet/manifestloader/
ambari-agent/src/main/puppet/modules/configgenerator/manifests/
ambari-agen...
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java Wed Nov 7 08:13:12 2012
@@ -31,6 +31,7 @@ import com.google.inject.assistedinject.
import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.ServiceComponentResponse;
import org.apache.ambari.server.orm.dao.*;
import org.apache.ambari.server.orm.entities.*;
@@ -57,6 +58,8 @@ public class ServiceComponentImpl implem
private HostComponentDesiredStateDAO hostComponentDesiredStateDAO;
@Inject
private ServiceComponentHostFactory serviceComponentHostFactory;
+ @Inject
+ private AmbariMetaInfo ambariMetaInfo;
boolean persisted = false;
private ServiceComponentDesiredStateEntity desiredStateEntity;
@@ -69,6 +72,8 @@ public class ServiceComponentImpl implem
private final boolean isClientComponent;
+
+
private void init() {
// TODO load during restart
// initialize from DB
@@ -85,12 +90,23 @@ public class ServiceComponentImpl implem
desiredStateEntity.setDesiredState(State.INIT);
this.desiredConfigs = new HashMap<String, String>();
- setDesiredStackVersion(new StackVersion(""));
+ setDesiredStackVersion(service.getDesiredStackVersion());
this.hostComponents = new HashMap<String, ServiceComponentHost>();
- // FIXME use meta data library to decide client or not
- this.isClientComponent = false;
+ StackId stackId = service.getDesiredStackVersion();
+ ComponentInfo compInfo = ambariMetaInfo.getComponentCategory(
+ stackId.getStackName(), stackId.getStackVersion(), service.getName(),
+ componentName);
+ if (compInfo == null) {
+ throw new RuntimeException("Trying to create a ServiceComponent"
+ + " not recognized in stack info"
+ + ", clusterName=" + service.getCluster().getClusterName()
+ + ", serviceName=" + service.getName()
+ + ", componentName=" + componentName
+ + ", stackInfo=" + stackId.getStackId());
+ }
+ this.isClientComponent = compInfo.isClient();
init();
}
@@ -104,8 +120,6 @@ public class ServiceComponentImpl implem
this.service = service;
this.desiredStateEntity = serviceComponentDesiredStateEntity;
- // FIXME use meta data library to decide client or not
- this.isClientComponent = false;
this.desiredConfigs = new HashMap<String, String>();
@@ -127,6 +141,20 @@ public class ServiceComponentImpl implem
desiredConfigs.put(entity.getConfigType(), entity.getVersionTag());
}
+ StackId stackId = service.getDesiredStackVersion();
+ ComponentInfo compInfo = ambariMetaInfo.getComponentCategory(
+ stackId.getStackName(), stackId.getStackVersion(), service.getName(),
+ getName());
+ if (compInfo == null) {
+ throw new RuntimeException("Trying to create a ServiceComponent"
+ + " not recognized in stack info"
+ + ", clusterName=" + service.getCluster().getClusterName()
+ + ", serviceName=" + service.getName()
+ + ", componentName=" + getName()
+ + ", stackInfo=" + stackId.getStackId());
+ }
+ this.isClientComponent = compInfo.isClient();
+
persisted = true;
}
@@ -194,6 +222,7 @@ public class ServiceComponentImpl implem
this.hostComponents.put(hostComponent.getHostName(), hostComponent);
}
+ @Override
public synchronized ServiceComponentHost addServiceComponentHost(
String hostName) throws AmbariException {
// TODO validation
@@ -303,12 +332,12 @@ public class ServiceComponentImpl implem
}
@Override
- public synchronized StackVersion getDesiredStackVersion() {
- return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackVersion.class);
+ public synchronized StackId getDesiredStackVersion() {
+ return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class);
}
@Override
- public synchronized void setDesiredStackVersion(StackVersion stackVersion) {
+ public synchronized void setDesiredStackVersion(StackId stackVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting DesiredStackVersion of Service"
+ ", clusterName=" + service.getCluster().getClusterName()
@@ -322,21 +351,12 @@ public class ServiceComponentImpl implem
saveIfPersisted();
}
- private synchronized Map<String, String> getConfigVersions() {
- Map<String, String> configVersions = new HashMap<String, String>();
-// for (Config c : desiredConfigs.values()) {
-// configVersions.put(c.getType(), c.getVersionTag());
-// }
-// return configVersions;
- return desiredConfigs;
- }
-
@Override
public synchronized ServiceComponentResponse convertToResponse() {
ServiceComponentResponse r = new ServiceComponentResponse(
getClusterId(), service.getCluster().getClusterName(),
- service.getName(), getName(), getConfigVersions(),
- getDesiredStackVersion().getStackVersion(),
+ service.getName(), getName(), this.desiredConfigs,
+ getDesiredStackVersion().getStackId(),
getDesiredState().toString());
return r;
}
@@ -374,18 +394,10 @@ public class ServiceComponentImpl implem
}
@Override
- @Transactional
public synchronized void persist() {
if (!persisted) {
- ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
- pk.setClusterId(service.getClusterId());
- pk.setServiceName(service.getName());
- ClusterServiceEntity serviceEntity = clusterServiceDAO.findByPK(pk);
-
- desiredStateEntity.setClusterServiceEntity(serviceEntity);
- serviceComponentDesiredStateDAO.create(desiredStateEntity);
- clusterServiceDAO.merge(serviceEntity);
- desiredStateEntity = serviceComponentDesiredStateDAO.merge(desiredStateEntity);
+ persistEntities();
+ refresh();
service.refresh();
persisted = true;
} else {
@@ -393,13 +405,27 @@ public class ServiceComponentImpl implem
}
}
+ @Transactional
+ protected void persistEntities() {
+ ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
+ pk.setClusterId(service.getClusterId());
+ pk.setServiceName(service.getName());
+ ClusterServiceEntity serviceEntity = clusterServiceDAO.findByPK(pk);
+
+ desiredStateEntity.setClusterServiceEntity(serviceEntity);
+ serviceComponentDesiredStateDAO.create(desiredStateEntity);
+ clusterServiceDAO.merge(serviceEntity);
+ }
+
@Override
+ @Transactional
public void refresh() {
if (isPersisted()) {
ServiceComponentDesiredStateEntityPK pk = new ServiceComponentDesiredStateEntityPK();
pk.setComponentName(getName());
pk.setClusterId(getClusterId());
pk.setServiceName(getServiceName());
+ // TODO: desiredStateEntity is assigned in unsynchronized way, may be a bug
desiredStateEntity = serviceComponentDesiredStateDAO.findByPK(pk);
serviceComponentDesiredStateDAO.refresh(desiredStateEntity);
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java Wed Nov 7 08:13:12 2012
@@ -83,7 +83,8 @@ public class ServiceImpl implements Serv
}
@AssistedInject
- public ServiceImpl(@Assisted Cluster cluster, @Assisted String serviceName, Injector injector) {
+ public ServiceImpl(@Assisted Cluster cluster, @Assisted String serviceName,
+ Injector injector) {
this.injector = injector;
injector.injectMembers(this);
serviceEntity = new ClusterServiceEntity();
@@ -97,12 +98,13 @@ public class ServiceImpl implements Serv
this.desiredConfigs = new HashMap<String, String>();
this.components = new HashMap<String, ServiceComponent>();
- setDesiredStackVersion(new StackVersion(""));
+ setDesiredStackVersion(cluster.getDesiredStackVersion());
init();
}
@AssistedInject
- public ServiceImpl(@Assisted Cluster cluster, @Assisted ClusterServiceEntity serviceEntity, Injector injector) {
+ public ServiceImpl(@Assisted Cluster cluster, @Assisted ClusterServiceEntity
+ serviceEntity, Injector injector) {
this.injector = injector;
injector.injectMembers(this);
this.serviceEntity = serviceEntity;
@@ -118,14 +120,18 @@ public class ServiceImpl implements Serv
this.configs = new HashMap<String, Config>();
if (!serviceEntity.getServiceComponentDesiredStateEntities().isEmpty()) {
- for (ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity : serviceEntity.getServiceComponentDesiredStateEntities()) {
+ for (ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity
+ : serviceEntity.getServiceComponentDesiredStateEntities()) {
components.put(serviceComponentDesiredStateEntity.getComponentName(),
- serviceComponentFactory.createExisting(this, serviceComponentDesiredStateEntity));
+ serviceComponentFactory.createExisting(this,
+ serviceComponentDesiredStateEntity));
}
}
- for (ServiceConfigMappingEntity mappingEntity : serviceEntity.getServiceConfigMappings()) {
- desiredConfigs.put(mappingEntity.getConfigType(), mappingEntity.getVersionTag());
+ for (ServiceConfigMappingEntity mappingEntity :
+ serviceEntity.getServiceConfigMappings()) {
+ desiredConfigs.put(mappingEntity.getConfigType(),
+ mappingEntity.getVersionTag());
}
@@ -276,12 +282,12 @@ public class ServiceImpl implements Serv
}
@Override
- public synchronized StackVersion getDesiredStackVersion() {
- return gson.fromJson(serviceDesiredStateEntity.getDesiredStackVersion(), StackVersion.class);
+ public synchronized StackId getDesiredStackVersion() {
+ return gson.fromJson(serviceDesiredStateEntity.getDesiredStackVersion(), StackId.class);
}
@Override
- public synchronized void setDesiredStackVersion(StackVersion stackVersion) {
+ public synchronized void setDesiredStackVersion(StackId stackVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting DesiredStackVersion of Service"
+ ", clusterName=" + cluster.getClusterName()
@@ -311,7 +317,7 @@ public class ServiceImpl implements Serv
cluster.getClusterName(),
getName(),
getConfigVersions(),
- getDesiredStackVersion().getStackVersion(),
+ getDesiredStackVersion().getStackId(),
getDesiredState().toString());
return r;
}
@@ -328,9 +334,9 @@ public class ServiceImpl implements Serv
+ ", clusterId=" + cluster.getClusterId()
+ ", desiredStackVersion=" + getDesiredStackVersion()
+ ", desiredState=" + getDesiredState().toString()
- + ", configs = " + getConfigVersions()
+ + ", configs = " + getConfigVersions()
+ ", components=[ ");
-
+
boolean first = true;
for(ServiceComponent sc : components.values()) {
if (!first) {
@@ -350,17 +356,10 @@ public class ServiceImpl implements Serv
}
@Override
- @Transactional
public synchronized void persist() {
if (!persisted) {
- ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
- serviceEntity.setClusterEntity(clusterEntity);
- clusterServiceDAO.create(serviceEntity);
- serviceDesiredStateDAO.create(serviceDesiredStateEntity);
- clusterEntity.getClusterServiceEntities().add(serviceEntity);
- clusterDAO.merge(clusterEntity);
- serviceEntity = clusterServiceDAO.merge(serviceEntity);
- serviceDesiredStateEntity = serviceDesiredStateDAO.merge(serviceDesiredStateEntity);
+ persistEntities();
+ refresh();
cluster.refresh();
persisted = true;
} else {
@@ -369,6 +368,18 @@ public class ServiceImpl implements Serv
}
@Transactional
+ protected void persistEntities() {
+ ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
+ serviceEntity.setClusterEntity(clusterEntity);
+ clusterServiceDAO.create(serviceEntity);
+ serviceDesiredStateDAO.create(serviceDesiredStateEntity);
+ clusterEntity.getClusterServiceEntities().add(serviceEntity);
+ clusterDAO.merge(clusterEntity);
+ serviceEntity = clusterServiceDAO.merge(serviceEntity);
+ serviceDesiredStateEntity = serviceDesiredStateDAO.merge(serviceDesiredStateEntity);
+ }
+
+ @Transactional
private void saveIfPersisted() {
if (isPersisted()) {
clusterServiceDAO.merge(serviceEntity);
@@ -377,6 +388,7 @@ public class ServiceImpl implements Serv
}
@Override
+ @Transactional
public synchronized void refresh() {
if (isPersisted()) {
ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java Wed Nov 7 08:13:12 2012
@@ -21,12 +21,17 @@ package org.apache.ambari.server.state;
import java.util.ArrayList;
import java.util.List;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.map.annotate.JsonFilter;
+
+@JsonFilter("propertiesfilter")
public class ServiceInfo {
private String name;
private String version;
private String user;
private String comment;
private List<PropertyInfo> properties;
+ private List<ComponentInfo> components;
public String getName() {
return name;
@@ -60,24 +65,44 @@ public class ServiceInfo {
this.comment = comment;
}
- public synchronized List<PropertyInfo> getProperties() {
+ public List<PropertyInfo> getProperties() {
if (properties == null) properties = new ArrayList<PropertyInfo>();
return properties;
}
- public void setProperties(List<PropertyInfo> properties) {
- this.properties = properties;
+ public List<ComponentInfo> getComponents() {
+ if (components == null) components = new ArrayList<ComponentInfo>();
+ return components;
}
+ public ComponentInfo getClientComponent() {
+ if (components == null || components.isEmpty()) {
+ return null;
+ }
+ for (ComponentInfo compInfo : components) {
+ if (compInfo.isClient()) {
+ return compInfo;
+ }
+ }
+ return components.get(0);
+ }
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Service name:" + name + "\nversion:" + version + "\nuser:" + user + "\ncomment:" + comment);
+ sb.append("Service name:" + name + "\nversion:" + version +
+ "\nuser:" + user + "\ncomment:" + comment);
// if(properties != null)
// for (PropertyInfo property : properties) {
-// sb.append("\tProperty name=" + property.getName() + "\nproperty value=" + property.getValue() + "\ndescription=" + property.getDescription());
+// sb.append("\tProperty name=" + property.getName() +
+ //"\nproperty value=" + property.getValue() + "\ndescription=" + property.getDescription());
// }
+ for(ComponentInfo component : components){
+ sb.append("\n\n\nComponent:\n");
+ sb.append("name="+ component.getName());
+ sb.append("\tcategory="+ component.getCategory() );
+ }
+
return sb.toString();
}
}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackId.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackId.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackId.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackId.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state;
+
+public class StackId {
+ private String stackName;
+ private String stackVersion;
+
+ public StackId() {
+ super();
+ this.stackName = "";
+ this.stackVersion = "";
+ }
+
+ public StackId(String stackId) {
+ super();
+ parseStackIdHelper(this, stackId);
+ }
+
+ public StackId(StackInfo stackInfo) {
+ this.stackName = stackInfo.getName();
+ this.stackVersion = stackInfo.getVersion();
+ }
+
+ /**
+ * @return the stackName
+ */
+ public String getStackName() {
+ return stackName;
+ }
+
+ /**
+ * @return the stackVersion
+ */
+ public String getStackVersion() {
+ return stackVersion;
+ }
+
+ /**
+ * @return the stackVersion
+ */
+ public String getStackId() {
+ if (stackName.isEmpty()
+ && stackVersion.isEmpty()) {
+ return "";
+ }
+ return stackName + "-" + stackVersion;
+ }
+
+ /**
+ * @param stackId the stackVersion to set
+ */
+ public void setStackId(String stackId) {
+ parseStackIdHelper(this, stackId);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (!(object instanceof StackId)) {
+ return false;
+ }
+ if (this == object) {
+ return true;
+ }
+ StackId s = (StackId) object;
+ return stackVersion.equals(s.stackVersion);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = stackVersion != null ? stackVersion.hashCode() : 0;
+ return result;
+ }
+
+ public String toString() {
+ return this.stackVersion;
+ }
+
+ public static void parseStackIdHelper(StackId stackVersion,
+ String stackId) {
+ if (stackId.isEmpty()) {
+ stackVersion.stackName = "";
+ stackVersion.stackVersion = "";
+ return;
+ }
+ int pos = stackId.indexOf('-');
+ if (pos == -1
+ || (stackId.length() <= (pos+1))) {
+ throw new RuntimeException("Could not parse invalid Stack Id"
+ + ", stackId=" + stackId);
+ }
+ stackVersion.stackName = stackId.substring(0, pos);
+ stackVersion.stackVersion = stackId.substring(pos+1);
+ }
+
+}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackInfo.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackInfo.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackInfo.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/StackInfo.java Wed Nov 7 08:13:12 2012
@@ -57,7 +57,7 @@ public class StackInfo {
return services;
}
- public void setServices(List<ServiceInfo> services) {
+ public synchronized void setServices(List<ServiceInfo> services) {
this.services = services;
}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/Action.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/Action.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/Action.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/Action.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+
+public interface Action {
+
+ /**
+ * Get the Action ID for the action
+ * @return ActionId
+ */
+ public ActionId getId();
+
+ // TODO requires some form of ActionType to ensure only one running
+ // action per action type
+ // There may be gotchas such as de-commissioning should be allowed to happen
+ // on more than one host at a time
+
+
+ /**
+ * Get Start Time of the action
+ * @return Start time as a unix timestamp
+ */
+ public long getStartTime();
+
+ /**
+ * Get the last update time of the Action when its progress status
+ * was updated
+ * @return Last Update Time as a unix timestamp
+ */
+ public long getLastUpdateTime();
+
+ /**
+ * Time when the Action completed
+ * @return Completion Time as a unix timestamp
+ */
+ public long getCompletionTime();
+
+ /**
+ * Get the current state of the Action
+ * @return ActionState
+ */
+ public ActionState getState();
+
+ /**
+ * Set the State of the Action
+ * @param state ActionState
+ */
+ public void setState(ActionState state);
+
+ /**
+ * Send a ActionEvent to the Action's StateMachine
+ * @param event ActionEvent
+ * @throws InvalidStateTransitionException
+ */
+ public void handleEvent(ActionEvent event)
+ throws InvalidStateTransitionException;
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionCompletedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionCompletedEvent.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionCompletedEvent.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionCompletedEvent.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public class ActionCompletedEvent extends ActionEvent {
+
+ private final long completionTime;
+
+ // TODO
+ // need to add action report
+
+ public ActionCompletedEvent(ActionId actionId, long completionTime) {
+ super(ActionEventType.ACTION_COMPLETED, actionId);
+ this.completionTime = completionTime;
+ }
+
+ /**
+ * @return the completionTime
+ */
+ public long getCompletionTime() {
+ return completionTime;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEvent.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEvent.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEvent.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+import org.apache.ambari.server.state.fsm.event.AbstractEvent;
+
+/**
+ * Base class for all events that affect the Action FSM
+ */
+public abstract class ActionEvent extends AbstractEvent<ActionEventType> {
+
+ /**
+ * ActionId identifying the action
+ */
+ private final ActionId actionId;
+
+ public ActionEvent(ActionEventType type, ActionId actionId) {
+ super(type);
+ this.actionId = actionId;
+ }
+
+ /**
+ * @return the actionId
+ */
+ public ActionId getActionId() {
+ return actionId;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEventType.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEventType.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEventType.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionEventType.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public enum ActionEventType {
+ /**
+ * Initial state for the Action when triggered.
+ */
+ ACTION_INIT,
+ /**
+ * Action still in progress.
+ */
+ ACTION_IN_PROGRESS,
+ /**
+ * Action completed successfully.
+ */
+ ACTION_COMPLETED,
+ /**
+ * Action failed to complete successfully.
+ */
+ ACTION_FAILED
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionFailedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionFailedEvent.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionFailedEvent.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionFailedEvent.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public class ActionFailedEvent extends ActionEvent {
+
+ private final long completionTime;
+
+ // TODO
+ // need to add action report
+
+ public ActionFailedEvent(ActionId actionId, long completionTime) {
+ super(ActionEventType.ACTION_FAILED, actionId);
+ this.completionTime = completionTime;
+ }
+
+ /**
+ * @return the completionTime
+ */
+ public long getCompletionTime() {
+ return completionTime;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionId.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionId.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionId.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionId.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+// TODO
+public class ActionId {
+
+ final long actionId;
+
+ final ActionType actionType;
+
+ public ActionId(long actionId, ActionType actionType) {
+ super();
+ this.actionId = actionId;
+ this.actionType = actionType;
+ }
+
+ public String toString() {
+ return "[ actionId=" + actionId
+ + ", actionType=" + actionType + "]";
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionImpl.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionImpl.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionImpl.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.fsm.SingleArcTransition;
+import org.apache.ambari.server.state.fsm.StateMachine;
+import org.apache.ambari.server.state.fsm.StateMachineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ActionImpl implements Action {
+
+ private static final Log LOG = LogFactory.getLog(ActionImpl.class);
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private ActionId id;
+
+ private long startTime;
+ private long lastUpdateTime;
+ private long completionTime;
+
+ // TODO
+ // need to add action report
+
+ private static final StateMachineFactory
+ <ActionImpl, ActionState, ActionEventType, ActionEvent>
+ stateMachineFactory
+ = new StateMachineFactory<ActionImpl, ActionState,
+ ActionEventType, ActionEvent>
+ (ActionState.INIT)
+
+ // define the state machine of a Action
+
+ .addTransition(ActionState.INIT, ActionState.IN_PROGRESS,
+ ActionEventType.ACTION_IN_PROGRESS, new ActionProgressUpdateTransition())
+ .addTransition(ActionState.INIT, ActionState.COMPLETED,
+ ActionEventType.ACTION_COMPLETED, new ActionCompletedTransition())
+ .addTransition(ActionState.INIT, ActionState.FAILED,
+ ActionEventType.ACTION_FAILED, new ActionFailedTransition())
+ .addTransition(ActionState.INIT, ActionState.IN_PROGRESS,
+ ActionEventType.ACTION_IN_PROGRESS, new ActionProgressUpdateTransition())
+ .addTransition(ActionState.IN_PROGRESS, ActionState.IN_PROGRESS,
+ ActionEventType.ACTION_IN_PROGRESS, new ActionProgressUpdateTransition())
+ .addTransition(ActionState.IN_PROGRESS, ActionState.COMPLETED,
+ ActionEventType.ACTION_COMPLETED, new ActionCompletedTransition())
+ .addTransition(ActionState.IN_PROGRESS, ActionState.FAILED,
+ ActionEventType.ACTION_FAILED, new ActionFailedTransition())
+ .addTransition(ActionState.COMPLETED, ActionState.INIT,
+ ActionEventType.ACTION_INIT, new NewActionTransition())
+ .addTransition(ActionState.FAILED, ActionState.INIT,
+ ActionEventType.ACTION_INIT, new NewActionTransition())
+ .installTopology();
+
+ private final StateMachine<ActionState, ActionEventType, ActionEvent>
+ stateMachine;
+
+ public ActionImpl(ActionId id, long startTime) {
+ super();
+ this.id = id;
+ this.stateMachine = stateMachineFactory.make(this);
+ ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.startTime = startTime;
+ this.lastUpdateTime = -1;
+ this.completionTime = -1;
+ }
+
+ private void reset() {
+ try {
+ writeLock.lock();
+ this.startTime = -1;
+ this.lastUpdateTime = -1;
+ this.completionTime = -1;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ static class NewActionTransition
+ implements SingleArcTransition<ActionImpl, ActionEvent> {
+
+ @Override
+ public void transition(ActionImpl action, ActionEvent event) {
+ ActionInitEvent e = (ActionInitEvent) event;
+ // TODO audit logs
+ action.reset();
+ action.setId(e.getActionId());
+ action.setStartTime(e.getStartTime());
+ LOG.info("Launching a new Action"
+ + ", actionId=" + action.getId()
+ + ", startTime=" + action.getStartTime());
+ }
+ }
+
+ static class ActionProgressUpdateTransition
+ implements SingleArcTransition<ActionImpl, ActionEvent> {
+
+ @Override
+ public void transition(ActionImpl action, ActionEvent event) {
+ ActionProgressUpdateEvent e = (ActionProgressUpdateEvent) event;
+ action.setLastUpdateTime(e.getProgressUpdateTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Progress update for Action"
+ + ", actionId=" + action.getId()
+ + ", startTime=" + action.getStartTime()
+ + ", lastUpdateTime=" + action.getLastUpdateTime());
+ }
+ }
+ }
+
+ static class ActionCompletedTransition
+ implements SingleArcTransition<ActionImpl, ActionEvent> {
+
+ @Override
+ public void transition(ActionImpl action, ActionEvent event) {
+ // TODO audit logs
+ ActionCompletedEvent e = (ActionCompletedEvent) event;
+ action.setCompletionTime(e.getCompletionTime());
+ action.setLastUpdateTime(e.getCompletionTime());
+
+ LOG.info("Action completed successfully"
+ + ", actionId=" + action.getId()
+ + ", startTime=" + action.getStartTime()
+ + ", completionTime=" + action.getCompletionTime());
+ }
+ }
+
+ static class ActionFailedTransition
+ implements SingleArcTransition<ActionImpl, ActionEvent> {
+
+ @Override
+ public void transition(ActionImpl action, ActionEvent event) {
+ // TODO audit logs
+ ActionFailedEvent e = (ActionFailedEvent) event;
+ action.setCompletionTime(e.getCompletionTime());
+ action.setLastUpdateTime(e.getCompletionTime());
+ LOG.info("Action failed to complete"
+ + ", actionId=" + action.getId()
+ + ", startTime=" + action.getStartTime()
+ + ", completionTime=" + action.getCompletionTime());
+ }
+ }
+
+
+ @Override
+ public ActionState getState() {
+ try {
+ readLock.lock();
+ return stateMachine.getCurrentState();
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void setState(ActionState state) {
+ try {
+ writeLock.lock();
+ stateMachine.setCurrentState(state);
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void handleEvent(ActionEvent event)
+ throws InvalidStateTransitionException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handling Action event, eventType=" + event.getType().name()
+ + ", event=" + event.toString());
+ }
+ ActionState oldState = getState();
+ try {
+ writeLock.lock();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitionException e) {
+ LOG.error("Can't handle Action event at current state"
+ + ", actionId=" + this.getId()
+ + ", currentState=" + oldState
+ + ", eventType=" + event.getType()
+ + ", event=" + event);
+ throw e;
+ }
+ }
+ finally {
+ writeLock.unlock();
+ }
+ if (oldState != getState()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Action transitioned to a new state"
+ + ", actionId=" + this.getId()
+ + ", oldState=" + oldState
+ + ", currentState=" + getState()
+ + ", eventType=" + event.getType().name()
+ + ", event=" + event);
+ }
+ }
+ }
+
+ @Override
+ public ActionId getId() {
+ try {
+ readLock.lock();
+ return id;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ private void setId(ActionId id) {
+ try {
+ writeLock.lock();
+ this.id = id;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public long getStartTime() {
+ try {
+ readLock.lock();
+ return startTime;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setStartTime(long startTime) {
+ try {
+ writeLock.lock();
+ this.startTime = startTime;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastUpdateTime() {
+ try {
+ readLock.lock();
+ return lastUpdateTime;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setLastUpdateTime(long lastUpdateTime) {
+ try {
+ writeLock.lock();
+ this.lastUpdateTime = lastUpdateTime;
+ }
+ finally {
+ writeLock.unlock();
+ }
+
+ }
+
+ @Override
+ public long getCompletionTime() {
+ try {
+ readLock.lock();
+ return completionTime;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setCompletionTime(long completionTime) {
+ try {
+ writeLock.lock();
+ this.completionTime = completionTime;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionInitEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionInitEvent.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionInitEvent.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionInitEvent.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public class ActionInitEvent extends ActionEvent {
+
+ private final long startTime;
+
+ public ActionInitEvent(ActionId actionId, long startTime) {
+ super(ActionEventType.ACTION_INIT, actionId);
+ this.startTime = startTime;
+ }
+
+ /**
+ * @return the start time of the Action
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionProgressUpdateEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionProgressUpdateEvent.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionProgressUpdateEvent.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionProgressUpdateEvent.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public class ActionProgressUpdateEvent extends ActionEvent {
+
+ private final long progressUpdateTime;
+
+ public ActionProgressUpdateEvent(ActionId actionId, long progressUpdateTime) {
+ super(ActionEventType.ACTION_IN_PROGRESS, actionId);
+ this.progressUpdateTime = progressUpdateTime;
+ }
+
+ /**
+ * @return the progressUpdateTime
+ */
+ public long getProgressUpdateTime() {
+ return progressUpdateTime;
+ }
+
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionState.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionState.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionState.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionState.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public enum ActionState {
+ /**
+ * Initial state for the Action.
+ * When a new action is triggered or set in motion.
+ */
+ INIT,
+ /**
+ * State when the Action is triggered on the cluster,
+ */
+ IN_PROGRESS,
+ /**
+ * State of successful completion
+ */
+ COMPLETED,
+ /**
+ * Action failed to complete successfully
+ */
+ FAILED
+}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionType.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionType.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionType.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/action/ActionType.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.action;
+
+public class ActionType {
+
+ public final String actionName;
+
+ public ActionType(String actionName) {
+ super();
+ this.actionName = actionName;
+ }
+
+}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java Wed Nov 7 08:13:12 2012
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
@@ -47,7 +48,7 @@ import org.apache.ambari.server.state.Co
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceFactory;
-import org.apache.ambari.server.state.StackVersion;
+import org.apache.ambari.server.state.StackId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +64,7 @@ public class ClusterImpl implements Clus
@Inject
private Clusters clusters;
- private StackVersion desiredStackVersion;
+ private StackId desiredStackVersion;
private Map<String, Service> services = new TreeMap<String, Service>();
@@ -118,7 +119,7 @@ public class ClusterImpl implements Clus
Map<String,Map<String,ServiceComponentHost>>>();
this.serviceComponentHostsByHost = new HashMap<String,
List<ServiceComponentHost>>();
- this.desiredStackVersion = new StackVersion("");
+ this.desiredStackVersion = new StackId();
configs = new HashMap<String, Map<String,Config>>();
if (!clusterEntity.getClusterConfigEntities().isEmpty()) {
@@ -307,12 +308,12 @@ public class ClusterImpl implements Clus
}
@Override
- public synchronized StackVersion getDesiredStackVersion() {
+ public synchronized StackId getDesiredStackVersion() {
return desiredStackVersion;
}
@Override
- public synchronized void setDesiredStackVersion(StackVersion stackVersion) {
+ public synchronized void setDesiredStackVersion(StackId stackVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("Changing DesiredStackVersion of Cluster"
+ ", clusterName=" + getClusterName()
@@ -369,14 +370,15 @@ public class ClusterImpl implements Clus
public synchronized ClusterResponse convertToResponse()
throws AmbariException {
ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(),
- clusters.getHostsForCluster(getClusterName()).keySet());
+ clusters.getHostsForCluster(getClusterName()).keySet(),
+ getDesiredStackVersion().getStackId());
return r;
}
public void debugDump(StringBuilder sb) {
sb.append("Cluster={ clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId()
- + ", desiredStackVersion=" + desiredStackVersion.getStackVersion()
+ + ", desiredStackVersion=" + desiredStackVersion.getStackId()
+ ", services=[ ");
boolean first = true;
for(Service s : services.values()) {
@@ -392,6 +394,7 @@ public class ClusterImpl implements Clus
}
@Override
+ @Transactional
public synchronized void refresh() {
clusterEntity = clusterDAO.findById(clusterEntity.getClusterId());
clusterDAO.refresh(clusterEntity);
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java Wed Nov 7 08:13:12 2012
@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
+import javax.persistence.RollbackException;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.HostNotFoundException;
@@ -49,7 +48,9 @@ import org.apache.ambari.server.state.ho
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.persistence.RollbackException;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
@Singleton
public class ClustersImpl implements Clusters {
@@ -134,6 +135,8 @@ public class ClustersImpl implements Clu
Cluster cluster = clusterFactory.create(clusterEntity);
clustersById.put(cluster.getClusterId(), cluster);
clusters.put(clusterEntity.getClusterName(), cluster);
+ if (!clusterHostMap.containsKey(clusterEntity.getClusterName()))
+ clusterHostMap.put(clusterEntity.getClusterName(), new HashSet<Host>());
} else {
throw new ClusterNotFoundException("clusterID=" + id);
}
@@ -236,7 +239,9 @@ public class ClustersImpl implements Clu
host.addToCluster(cluster);
hostClusterMap.get(hostname).add(cluster);
+
clusterHostMap.get(clusterName).add(host);
+
if (LOG.isDebugEnabled()) {
LOG.debug("Mapping a host to a cluster"
+ ", clusterName=" + clusterName
@@ -291,11 +296,15 @@ public class ClustersImpl implements Clu
@Override
public Map<String, Host> getHostsForCluster(String clusterName)
throws AmbariException {
+
getCluster(clusterName);
+
Map<String, Host> hosts = new HashMap<String, Host>();
+
for (Host h : clusterHostMap.get(clusterName)) {
hosts.put(h.getHostName(), h);
}
+
return hosts;
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java Wed Nov 7 08:13:12 2012
@@ -45,7 +45,6 @@ import org.apache.ambari.server.state.fs
import org.apache.ambari.server.state.fsm.SingleArcTransition;
import org.apache.ambari.server.state.fsm.StateMachine;
import org.apache.ambari.server.state.fsm.StateMachineFactory;
-import org.apache.ambari.server.state.job.Job;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.commons.logging.Log;
@@ -204,6 +203,15 @@ public class HostImpl implements Host {
host.importHostInfo(e.hostInfo);
host.setLastRegistrationTime(e.registrationTime);
host.setAgentVersion(e.agentVersion);
+
+ String agentVersion = null;
+ if (e.agentVersion != null) {
+ agentVersion = e.agentVersion.getVersion();
+ }
+ LOG.info("Received host registration, host="
+ + e.hostInfo.toString()
+ + ", registrationTime=" + e.registrationTime
+ + ", agentVersion=" + agentVersion);
host.persist();
}
}
@@ -320,7 +328,14 @@ public class HostImpl implements Host {
if (hostInfo.getOS() != null
&& !hostInfo.getOS().isEmpty()) {
- setOsType(hostInfo.getOS());
+ String osType = hostInfo.getOS();
+ if (hostInfo.getOSRelease() != null) {
+ int pos = hostInfo.getOSRelease().indexOf('.');
+ if (pos > 0) {
+ osType += hostInfo.getOSRelease().substring(0, pos);
+ }
+ }
+ setOsType(osType.toLowerCase());
}
if (hostInfo.getMounts() != null
@@ -749,12 +764,6 @@ public class HostImpl implements Host {
}
@Override
- public List<Job> getJobs() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public long getTimeInState() {
// TODO Auto-generated method stub
return 0;
@@ -826,38 +835,43 @@ public class HostImpl implements Host {
* Save host to database and make all changes to be saved afterwards
*/
@Override
- @Transactional
public void persist() {
try {
writeLock.lock();
if (!persisted) {
- hostDAO.create(hostEntity);
- hostStateDAO.create(hostStateEntity);
- if (!hostEntity.getClusterEntities().isEmpty()) {
- for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) {
- clusterEntity.getHostEntities().add(hostEntity);
- clusterDAO.merge(clusterEntity);
- try {
- clusters.getClusterById(clusterEntity.getClusterId()).refresh();
- } catch (AmbariException e) {
- LOG.error(e);
- throw new RuntimeException("Cluster '" + clusterEntity.getClusterId() + "' was removed", e);
- }
+ persistEntities();
+ refresh();
+ for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) {
+ try {
+ clusters.getClusterById(clusterEntity.getClusterId()).refresh();
+ } catch (AmbariException e) {
+ LOG.error(e);
+ throw new RuntimeException("Cluster '" + clusterEntity.getClusterId() + "' was removed", e);
}
}
- hostEntity = hostDAO.merge(hostEntity);
- hostStateEntity = hostStateDAO.merge(hostStateEntity);
persisted = true;
} else {
- hostDAO.merge(hostEntity);
- hostStateDAO.merge(hostStateEntity);
+ saveIfPersisted();
}
} finally {
writeLock.unlock();
}
}
+ @Transactional
+ protected void persistEntities() {
+ hostDAO.create(hostEntity);
+ hostStateDAO.create(hostStateEntity);
+ if (!hostEntity.getClusterEntities().isEmpty()) {
+ for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) {
+ clusterEntity.getHostEntities().add(hostEntity);
+ clusterDAO.merge(clusterEntity);
+ }
+ }
+ }
+
@Override
+ @Transactional
public void refresh() {
try {
writeLock.lock();
@@ -872,6 +886,7 @@ public class HostImpl implements Host {
}
}
+ @Transactional
private void saveIfPersisted() {
if (isPersisted()) {
hostDAO.merge(hostEntity);
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java Wed Nov 7 08:13:12 2012
@@ -28,7 +28,6 @@ import org.apache.ambari.server.orm.dao.
import org.apache.ambari.server.orm.entities.*;
import org.apache.ambari.server.state.*;
import org.apache.ambari.server.state.fsm.*;
-import org.apache.ambari.server.state.job.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,6 +125,10 @@ public class ServiceComponentHostImpl im
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.INSTALLED,
+ State.STOPPING,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
+ new ServiceComponentHostOpStartedTransition())
.addTransition(State.STARTING,
State.STARTING,
@@ -148,6 +151,10 @@ public class ServiceComponentHostImpl im
State.STARTING,
ServiceComponentHostEventType.HOST_SVCCOMP_START,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.START_FAILED,
+ State.STOPPING,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
+ new ServiceComponentHostOpStartedTransition())
.addTransition(State.STARTED,
State.STOPPING,
@@ -442,7 +449,7 @@ public class ServiceComponentHostImpl im
stateEntity.setServiceName(serviceComponent.getServiceName());
stateEntity.setHostName(hostName);
stateEntity.setCurrentState(stateMachine.getCurrentState());
- stateEntity.setCurrentStackVersion(gson.toJson(new StackVersion("")));
+ stateEntity.setCurrentStackVersion(gson.toJson(new StackId()));
desiredStateEntity = new HostComponentDesiredStateEntity();
desiredStateEntity.setClusterId(serviceComponent.getClusterId());
@@ -450,7 +457,8 @@ public class ServiceComponentHostImpl im
desiredStateEntity.setServiceName(serviceComponent.getServiceName());
desiredStateEntity.setHostName(hostName);
desiredStateEntity.setDesiredState(State.INIT);
- desiredStateEntity.setDesiredStackVersion(gson.toJson(new StackVersion("")));
+ desiredStateEntity.setDesiredStackVersion(
+ gson.toJson(serviceComponent.getDesiredStackVersion()));
try {
this.host = clusters.getHost(hostName);
@@ -567,12 +575,6 @@ public class ServiceComponentHostImpl im
}
@Override
- public List<Job> getJobs() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public String getServiceComponentName() {
return serviceComponent.getName();
}
@@ -696,10 +698,10 @@ public class ServiceComponentHostImpl im
}
@Override
- public StackVersion getStackVersion() {
+ public StackId getStackVersion() {
try {
readLock.lock();
- return gson.fromJson(stateEntity.getCurrentStackVersion(), StackVersion.class);
+ return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class);
}
finally {
readLock.unlock();
@@ -707,7 +709,7 @@ public class ServiceComponentHostImpl im
}
@Override
- public void setStackVersion(StackVersion stackVersion) {
+ public void setStackVersion(StackId stackVersion) {
try {
writeLock.lock();
stateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
@@ -809,10 +811,10 @@ public class ServiceComponentHostImpl im
}
@Override
- public StackVersion getDesiredStackVersion() {
+ public StackId getDesiredStackVersion() {
try {
readLock.lock();
- return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackVersion.class);
+ return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class);
}
finally {
readLock.unlock();
@@ -820,7 +822,7 @@ public class ServiceComponentHostImpl im
}
@Override
- public void setDesiredStackVersion(StackVersion stackVersion) {
+ public void setDesiredStackVersion(StackId stackVersion) {
try {
writeLock.lock();
desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
@@ -856,7 +858,7 @@ public class ServiceComponentHostImpl im
getHostName(),
getConfigVersions(),
getState().toString(),
- getStackVersion().getStackVersion(),
+ getStackVersion().getStackId(),
getDesiredState().toString());
return r;
}
@@ -900,36 +902,12 @@ public class ServiceComponentHostImpl im
}
@Override
- @Transactional
public void persist() {
try {
writeLock.lock();
if (!persisted) {
- HostEntity hostEntity = hostDAO.findByName(getHostName());
- hostEntity.getHostComponentStateEntities().add(stateEntity);
- hostEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
-
- ServiceComponentDesiredStateEntityPK dpk = new ServiceComponentDesiredStateEntityPK();
- dpk.setClusterId(serviceComponent.getClusterId());
- dpk.setServiceName(serviceComponent.getServiceName());
- dpk.setComponentName(serviceComponent.getName());
-
- ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByPK(dpk);
- serviceComponentDesiredStateEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
-
- desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
- desiredStateEntity.setHostEntity(hostEntity);
- stateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
- stateEntity.setHostEntity(hostEntity);
-
- hostComponentStateDAO.create(stateEntity);
- hostComponentDesiredStateDAO.create(desiredStateEntity);
-
- serviceComponentDesiredStateDAO.merge(serviceComponentDesiredStateEntity);
- hostDAO.merge(hostEntity);
- stateEntity = hostComponentStateDAO.merge(stateEntity);
- desiredStateEntity = hostComponentDesiredStateDAO.merge(desiredStateEntity);
-
+ persistEntities();
+ refresh();
host.refresh();
serviceComponent.refresh();
persisted = true;
@@ -941,7 +919,34 @@ public class ServiceComponentHostImpl im
}
}
+ @Transactional
+ protected void persistEntities() {
+ HostEntity hostEntity = hostDAO.findByName(getHostName());
+ hostEntity.getHostComponentStateEntities().add(stateEntity);
+ hostEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
+
+ ServiceComponentDesiredStateEntityPK dpk = new ServiceComponentDesiredStateEntityPK();
+ dpk.setClusterId(serviceComponent.getClusterId());
+ dpk.setServiceName(serviceComponent.getServiceName());
+ dpk.setComponentName(serviceComponent.getName());
+
+ ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByPK(dpk);
+ serviceComponentDesiredStateEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
+
+ desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
+ desiredStateEntity.setHostEntity(hostEntity);
+ stateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
+ stateEntity.setHostEntity(hostEntity);
+
+ hostComponentStateDAO.create(stateEntity);
+ hostComponentDesiredStateDAO.create(desiredStateEntity);
+
+ serviceComponentDesiredStateDAO.merge(serviceComponentDesiredStateEntity);
+ hostDAO.merge(hostEntity);
+ }
+
@Override
+ @Transactional
public synchronized void refresh() {
if (isPersisted()) {
HostComponentStateEntityPK pk = new HostComponentStateEntityPK();
@@ -961,6 +966,7 @@ public class ServiceComponentHostImpl im
}
}
+ @Transactional
private void saveIfPersisted() {
if (isPersisted()) {
hostComponentStateDAO.merge(stateEntity);
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/ShellCommandUtil.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/ShellCommandUtil.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/ShellCommandUtil.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/ShellCommandUtil.java Wed Nov 7 08:13:12 2012
@@ -27,11 +27,11 @@ public class ShellCommandUtil {
private static final Log LOG = LogFactory.getLog(ShellCommandUtil.class);
/*
public static String LogAndReturnOpenSslExitCode(String command, int exitCode) {
- LogOpenSslExitCode(command, exitCode);
+ logOpenSslExitCode(command, exitCode);
return getOpenSslCommandResult(command, exitCode);
}
*/
- public static void LogOpenSslExitCode(String command, int exitCode) {
+ public static void logOpenSslExitCode(String command, int exitCode) {
if (exitCode == 0) {
LOG.info(getOpenSslCommandResult(command, exitCode));
} else {
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java Wed Nov 7 08:13:12 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -67,7 +68,7 @@ public class StageUtils {
"templeton_server_host");
componentToClusterInfoKeyMap.put("DASHBOARD", "dashboard_host");
componentToClusterInfoKeyMap.put("NAGIOS_SERVER", "nagios_server_host");
- componentToClusterInfoKeyMap.put("GANGLIA_MONITOR_SERVER",
+ componentToClusterInfoKeyMap.put("GANGLIA_SERVER",
"ganglia_server_host");
componentToClusterInfoKeyMap.put("DATANODE", "slave_hosts");
componentToClusterInfoKeyMap.put("TASKTRACKER", "slave_hosts");
@@ -143,7 +144,7 @@ public class StageUtils {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, true);
- InputStream is = new ByteArrayInputStream(json.getBytes());
+ InputStream is = new ByteArrayInputStream(json.getBytes(Charset.forName("UTF8")));
return mapper.readValue(is, ExecutionCommand.class);
}
@@ -151,7 +152,7 @@ public class StageUtils {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, true);
- InputStream is = new ByteArrayInputStream(json.getBytes());
+ InputStream is = new ByteArrayInputStream(json.getBytes(Charset.forName("UTF8")));
return mapper.readValue(is, clazz);
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/ambari-server.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/ambari-server.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/ambari-server.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/ambari-server.py Wed Nov 7 08:13:12 2012
@@ -24,6 +24,9 @@ import os
import signal
import subprocess
import re
+import urllib
+import string
+import glob
SETUP_ACTION = "setup"
START_ACTION = "start"
@@ -37,13 +40,19 @@ IP_TBLS_STOP_CMD = "service iptables sto
IP_TBLS_ENABLED="Firewall is running"
IP_TBLS_DISABLED="Firewall is not running"
IP_TBLS_SRVC_NT_FND="iptables: unrecognized service"
-SERVER_START_CMD="java -cp {0}"+ os.pathsep + ".." + os.sep + "lib" + os.sep + "ambari-server" + os.sep + "* org.apache.ambari.server.controller.AmbariServer"
+SERVER_START_CMD="{0}" + os.sep + "bin" + os.sep + "java -cp {1}"+ os.pathsep + ".." + os.sep + "lib" + os.sep + "ambari-server" + os.sep + "* org.apache.ambari.server.controller.AmbariServer"
AMBARI_CONF_VAR="AMBARI_CONF_DIR"
PG_ST_CMD = "service postgresql status"
PG_START_CMD = "service postgresql start"
PG_STATUS_RUNNING = "running"
PID_DIR="/var/run/ambari-server"
PID_NAME="ambari-server.pid"
+AMBARI_PROPERTIES_FILE="ambari.properties"
+JDK_LOCAL_FILENAME="jdk-distr.bin"
+JDK_MIN_FILESIZE=5000
+JDK_INSTALL_DIR="/usr/jdk64"
+CREATE_JDK_DIR_CMD = "mkdir -p " + JDK_INSTALL_DIR
+MAKE_FILE_EXECUTABLE_CMD = "chmod a+x {0}"
def run_os_command(cmd):
print 'about to run command: ' + cmd
@@ -54,7 +63,7 @@ def run_os_command(cmd):
)
(stdoutdata, stderrdata) = process.communicate()
return process.returncode, stdoutdata, stderrdata
-
+# todo: check if the scheme is already exist
def setup_db(args):
dbname = args.postgredbname
file = args.init_script_file
@@ -73,6 +82,21 @@ def check_se_down():
else:
return 1
+def get_conf_dir():
+ try:
+ conf_dir = os.environ[AMBARI_CONF_VAR]
+ return conf_dir
+ except KeyError:
+ print "Please set value of " + AMBARI_CONF_VAR + "!"
+ sys.exit(1)
+
+def search_file(filename, search_path, pathsep=os.pathsep):
+ """ Given a search path, find file with requested name """
+ for path in string.split(search_path, pathsep):
+ candidate = os.path.join(path, filename)
+ if os.path.exists(candidate): return os.path.abspath(candidate)
+ return None
+
def ip_tables_down():
retcode, out, err = run_os_command(IP_TBLS_ST_CMD)
if out == IP_TBLS_ENABLED:
@@ -87,7 +111,173 @@ def ip_tables_down():
if err.strip() == IP_TBLS_SRVC_NT_FND:
return 0
else:
- return retcode
+ return retcode, out
+
+
+# A Python replacement for java.util.Properties
+# Based on http://code.activestate.com/recipes/496795-a-python-replacement-for-javautilproperties/
+class Properties(object):
+
+ def __init__(self, props=None):
+
+ self._props = {}
+ self._origprops = {}
+ self._keymap = {}
+
+ self.othercharre = re.compile(r'(?<!\\)(\s*\=)|(?<!\\)(\s*\:)')
+ self.othercharre2 = re.compile(r'(\s*\=)|(\s*\:)')
+ self.bspacere = re.compile(r'\\(?!\s$)')
+
+ def __parse(self, lines):
+ lineno=0
+ i = iter(lines)
+ for line in i:
+ lineno += 1
+ line = line.strip()
+ if not line: continue
+ if line[0] == '#': continue
+ escaped=False
+ sepidx = -1
+ flag = 0
+ m = self.othercharre.search(line)
+ if m:
+ first, last = m.span()
+ start, end = 0, first
+ flag = 1
+ wspacere = re.compile(r'(?<![\\\=\:])(\s)')
+ else:
+ if self.othercharre2.search(line):
+ wspacere = re.compile(r'(?<![\\])(\s)')
+ start, end = 0, len(line)
+ m2 = wspacere.search(line, start, end)
+ if m2:
+ first, last = m2.span()
+ sepidx = first
+ elif m:
+ first, last = m.span()
+ sepidx = last - 1
+ while line[-1] == '\\':
+ nextline = i.next()
+ nextline = nextline.strip()
+ lineno += 1
+ line = line[:-1] + nextline
+ if sepidx != -1:
+ key, value = line[:sepidx], line[sepidx+1:]
+ else:
+ key,value = line,''
+ self.processPair(key, value)
+
+ def processPair(self, key, value):
+ oldkey = key
+ oldvalue = value
+ keyparts = self.bspacere.split(key)
+ strippable = False
+ lastpart = keyparts[-1]
+ if lastpart.find('\\ ') != -1:
+ keyparts[-1] = lastpart.replace('\\','')
+ elif lastpart and lastpart[-1] == ' ':
+ strippable = True
+ key = ''.join(keyparts)
+ if strippable:
+ key = key.strip()
+ oldkey = oldkey.strip()
+ oldvalue = self.unescape(oldvalue)
+ value = self.unescape(value)
+ self._props[key] = value.strip()
+ if self._keymap.has_key(key):
+ oldkey = self._keymap.get(key)
+ self._origprops[oldkey] = oldvalue.strip()
+ else:
+ self._origprops[oldkey] = oldvalue.strip()
+ self._keymap[key] = oldkey
+
+ def unescape(self, value):
+ newvalue = value.replace('\:',':')
+ newvalue = newvalue.replace('\=','=')
+ return newvalue
+
+ def load(self, stream):
+ if type(stream) is not file:
+ raise TypeError,'Argument should be a file object!'
+ if stream.mode != 'r':
+ raise ValueError,'Stream should be opened in read-only mode!'
+ try:
+ lines = stream.readlines()
+ self.__parse(lines)
+ except IOError, e:
+ raise
+
+ def getProperty(self, key):
+ return self._props.get(key,'')
+
+ def propertyNames(self):
+ return self._props.keys()
+
+ def getPropertyDict(self):
+ return self._props
+
+ def __getitem__(self, name):
+ return self.getProperty(name)
+
+ def __getattr__(self, name):
+ try:
+ return self.__dict__[name]
+ except KeyError:
+ if hasattr(self._props,name):
+ return getattr(self._props, name)
+
+
+def download_jdk():
+ conf_file = search_file(AMBARI_PROPERTIES_FILE, get_conf_dir())
+ if conf_file is None:
+ print 'File %s not found in search path $%s: %s' % (AMBARI_PROPERTIES_FILE, AMBARI_CONF_VAR, get_conf_dir())
+ return -1
+ print 'Loading properties from ' + conf_file
+ properties = None
+ try:
+ properties = Properties()
+ properties.load(open(conf_file))
+ except (Exception), e:
+ print 'Could not read "%s": %s' % (conf_file, e)
+ return -1
+ try:
+ jdk_url = properties['jdk.url']
+ resources_dir = properties['resources.dir']
+ except (KeyError), e:
+ print 'Property ' + str(e) + ' is not defined at ' + conf_file
+ return -1
+ dest_file = resources_dir + os.sep + JDK_LOCAL_FILENAME
+ print 'Trying to download JDK from ' + jdk_url + ' to ' + dest_file
+ try:
+ src_size = int(urllib.urlopen(jdk_url).info()['Content-Length'])
+ print 'JDK distribution size is ' + str(src_size) + ' bytes'
+ file_exists = os.path.isfile(dest_file)
+ file_size = -1
+ if file_exists:
+ file_size = os.stat(dest_file).st_size
+ if file_exists and file_size == src_size:
+ print "File already exists"
+ else:
+ urllib.urlretrieve (jdk_url, dest_file)
+ print 'Successfully downloaded JDK distribution to ' + dest_file
+ except Exception, e:
+ print 'Failed to download JDK: ' + str(e)
+ return -1
+ downloaded_size = os.stat(dest_file).st_size
+ if downloaded_size != src_size or downloaded_size < JDK_MIN_FILESIZE:
+ print 'Size of downloaded JDK distribution file is ' + str(downloaded_size) + ' bytes, it is probably \
+damaged or incomplete'
+ return -1
+ print "Installing JDK to {0}".format(JDK_INSTALL_DIR)
+ retcode, out, err = run_os_command(CREATE_JDK_DIR_CMD)
+ savedPath = os.getcwd()
+ os.chdir(JDK_INSTALL_DIR)
+ retcode, out, err = run_os_command(MAKE_FILE_EXECUTABLE_CMD.format(dest_file))
+ retcode, out, err = run_os_command(dest_file)
+ os.chdir(savedPath)
+ jdk_version = re.search('Creating (jdk.*)/jre', out).group(1)
+ print "Successfully installed JDK to {0}/{1}".format(JDK_INSTALL_DIR, jdk_version)
+ return 0
def check_postgre_up():
retcode, out, err = run_os_command(PG_ST_CMD)
@@ -124,27 +314,44 @@ def setup(args):
sys.exit(retcode)
print 'About to check iptables down'
- retcode = ip_tables_down()
- if (not retcode == 0):
+ retcode, out = ip_tables_down()
+ if (not retcode == 0 and out == IP_TBLS_ENABLED):
print 'Error! Failed to stop iptables'
sys.exit(retcode)
-
+
+ print 'About to download JDK and install it'
+ retcode = download_jdk()
+ if not retcode == 0:
+ print 'Error! Downloading or installing JDK failed'
+ sys.exit(retcode)
+
print 'Setup was finished sucessfully'
-
+
+def findJDK():
+ print "Looking for available JDKs at " + JDK_INSTALL_DIR
+ jdks = glob.glob(JDK_INSTALL_DIR + os.sep + "jdk*")
+ jdks.sort()
+ print "Found: " + str(jdks)
+ count = len(jdks)
+ if count == 0:
+ return
+ jdkPath = jdks[count - 1]
+ print "Choosed JDK {0}".format(jdkPath)
+ return jdkPath
+
def start(args):
- try:
- conf_dir = os.environ[AMBARI_CONF_VAR]
- except KeyError:
- print "Please set value of " + AMBARI_CONF_VAR + "!"
- sys.exit(1)
-
+ conf_dir = get_conf_dir()
+ jdk_path = findJDK()
+ if jdk_path is None:
+ print "No any JDK found, please run \"setup\" command to install it automatically or "\
+"install any JDK manually to " + JDK_INSTALL_DIR
+ return -1
retcode = check_postgre_up()
if not retcode == 0:
print 'Error! Unable to start postgre server'
sys.exit(retcode)
-
- command = SERVER_START_CMD.format(conf_dir)
-
+ command = SERVER_START_CMD.format(jdk_path, conf_dir)
+ #print "Running server: " + command
server_process = subprocess.Popen(command.split(' '))
f = open(PID_DIR + os.sep + PID_NAME, "w")
f.write(str(server_process.pid))
@@ -156,10 +363,13 @@ def stop(args):
os.kill(pid, signal.SIGKILL)
f.close()
os.remove(f.name)
-
-
-
-
+
+
+
+
+
+
+
def main():
parser = optparse.OptionParser(usage="usage: %prog [options] action",)
@@ -187,3 +397,9 @@ def main():
if __name__ == "__main__":
main()
+
+
+
+
+
+
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/bootstrap.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/bootstrap.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/bootstrap.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/python/bootstrap.py Wed Nov 7 08:13:12 2012
@@ -29,6 +29,8 @@ import threading
import traceback
from pprint import pformat
+AMBARI_PASSPHRASE_VAR = "AMBARI_PASSPHRASE"
+
class SCP(threading.Thread):
""" SCP implementation that is thread based. The status can be returned using
status val """
@@ -62,10 +64,10 @@ class SCP(threading.Thread):
class SSH(threading.Thread):
""" Ssh implementation of this """
- def __init__(self, sshKeyFile, host, command):
+ def __init__(self, sshKeyFile, host, commands):
self.sshKeyFile = sshKeyFile
self.host = host
- self.command = command
+ self.commands = commands
self.ret = {"exitstatus" : -1, "log": "FAILED"}
threading.Thread.__init__(self)
pass
@@ -79,7 +81,7 @@ class SSH(threading.Thread):
def run(self):
sshcommand = ["ssh", "-o", "ConnectTimeOut=3", "-o",
"StrictHostKeyChecking=no", "-i", self.sshKeyFile,
- self.host, self.command]
+ self.host, ";".join(self.commands)]
sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log = sshstat.communicate()
@@ -94,10 +96,10 @@ def splitlist(hosts, n):
class PSSH:
"""Run SSH in parallel for a given list of hosts"""
- def __init__(self, hosts, sshKeyFile, command):
+ def __init__(self, hosts, sshKeyFile, commands):
self.hosts = hosts
self.sshKeyFile = sshKeyFile
- self.command = command
+ self.commands = commands
self.ret = {}
pass
@@ -110,11 +112,11 @@ class PSSH:
for chunk in splitlist(self.hosts, 20):
chunkstats = []
for host in chunk:
- ssh = SSH(self.sshKeyFile, host, self.command)
+ ssh = SSH(self.sshKeyFile, host, self.commands)
ssh.start()
chunkstats.append(ssh)
pass
- """ wait for the scp's to complete """
+ """ wait for the ssh's to complete """
for chunkstat in chunkstats:
chunkstat.join()
self.ret[chunkstat.getHost()] = chunkstat.getStatus()
@@ -171,11 +173,19 @@ class BootStrap:
return os.path.join(self.scriptDir, "setupAgent.py")
def runSetupAgent(self):
- pssh = PSSH(self.hostlist, self.sshkeyFile, "/tmp/setupAgent.py")
+ commands = ["export AMBARI_PASSPHRASE=" + os.environ[AMBARI_PASSPHRASE_VAR], "/tmp/setupAgent.py"]
+ pssh = PSSH(self.hostlist, self.sshkeyFile, commands)
pssh.run()
out = pssh.getstatus()
logging.info("Parallel ssh returns " + pprint.pformat(out))
- pass
+
+ """ Test code for setting env var on agent host before starting setupAgent.py
+ commands = ["export AMBARI_PASSPHRASE=" + os.environ[AMBARI_PASSPHRASE_VAR], "set"]
+ pssh = PSSH(self.hostlist, self.sshkeyFile, commands)
+ pssh.run()
+ out = pssh.getstatus()
+ logging.info("Look for AMBARI_PASSPHRASE in out " + pprint.pformat(out))
+ """
def copyNeededFiles(self):
try: