You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by hi...@apache.org on 2012/09/11 23:35:24 UTC
svn commit: r1383625 - in /incubator/ambari/branches/AMBARI-666: ./
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/
ambari-server/src/test/java/org/apache/ambari/server/state/live/node/
Author: hitesh
Date: Tue Sep 11 21:35:23 2012
New Revision: 1383625
URL: http://svn.apache.org/viewvc?rev=1383625&view=rev
Log:
AMBARI-714. Job FSM Impl and tests. (Contributed by hitesh)
Modified:
incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java
Modified: incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt (original)
+++ incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt Tue Sep 11 21:35:23 2012
@@ -12,6 +12,8 @@ AMBARI-666 branch (unreleased changes)
NEW FEATURES
+ AMBARI-714. Job FSM Impl and tests. (hitesh)
+
AMBARI-721. Remove Hardwareprofile class since its not needed anymore.
(mahadev)
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java Tue Sep 11 21:35:23 2012
@@ -4,6 +4,9 @@ public class JobCompletedEvent extends J
private final long completionTime;
+ // TODO
+ // need to add job report
+
public JobCompletedEvent(JobId jobId, long completionTime) {
super(JobEventType.JOB_COMPLETED, jobId);
this.completionTime = completionTime;
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java Tue Sep 11 21:35:23 2012
@@ -5,7 +5,7 @@ public class JobFailedEvent extends JobE
private final long completionTime;
// TODO
- // need to add job failed reason
+ // need to add job report
public JobFailedEvent(JobId jobId, long completionTime) {
super(JobEventType.JOB_FAILED, jobId);
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java Tue Sep 11 21:35:23 2012
@@ -31,5 +31,8 @@ public class JobId {
this.jobType = jobType;
}
-
+ public String toString() {
+ return "[ jobId=" + jobId
+ + ", jobType=" + jobType + "]";
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java Tue Sep 11 21:35:23 2012
@@ -43,7 +43,7 @@ public class JobImpl implements Job {
private long completionTime;
// TODO
- // need to add job failed reason
+ // need to add job report
private static final StateMachineFactory
<JobImpl, JobState, JobEventType, JobEvent>
@@ -56,6 +56,12 @@ public class JobImpl implements Job {
.addTransition(JobState.INIT, JobState.IN_PROGRESS,
JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
+ .addTransition(JobState.INIT, JobState.COMPLETED,
+ JobEventType.JOB_COMPLETED, new JobCompletedTransition())
+ .addTransition(JobState.INIT, JobState.FAILED,
+ JobEventType.JOB_FAILED, new JobFailedTransition())
+ .addTransition(JobState.INIT, JobState.IN_PROGRESS,
+ JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
.addTransition(JobState.IN_PROGRESS, JobState.IN_PROGRESS,
JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
.addTransition(JobState.IN_PROGRESS, JobState.COMPLETED,
@@ -71,16 +77,28 @@ public class JobImpl implements Job {
private final StateMachine<JobState, JobEventType, JobEvent>
stateMachine;
- public JobImpl(JobId id) {
+ public JobImpl(JobId id, long startTime) {
super();
this.id = id;
this.stateMachine = stateMachineFactory.make(this);
ReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
- startTime = -1;
- lastUpdateTime = -1;
- completionTime = -1;
+ this.startTime = startTime;
+ this.lastUpdateTime = -1;
+ this.completionTime = -1;
+ }
+
+ private void reset() {
+ try {
+ writeLock.lock();
+ this.startTime = -1;
+ this.lastUpdateTime = -1;
+ this.completionTime = -1;
+ }
+ finally {
+ writeLock.unlock();
+ }
}
static class NewJobTransition
@@ -90,6 +108,7 @@ public class JobImpl implements Job {
public void transition(JobImpl job, JobEvent event) {
NewJobEvent e = (NewJobEvent) event;
// TODO audit logs
+ job.reset();
job.setId(e.getJobId());
job.setStartTime(e.getStartTime());
LOG.info("Launching a new Job"
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java Tue Sep 11 21:35:23 2012
@@ -2,6 +2,11 @@ package org.apache.ambari.server.state.l
public class JobType {
- public String jobName;
+ public final String jobName;
+
+ public JobType(String jobName) {
+ super();
+ this.jobName = jobName;
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java Tue Sep 11 21:35:23 2012
@@ -4,9 +4,9 @@ public class NewJobEvent extends JobEven
private final long startTime;
- public NewJobEvent(JobId jobId, long creationTime) {
+ public NewJobEvent(JobId jobId, long startTime) {
super(JobEventType.JOB_INIT, jobId);
- this.startTime = creationTime;
+ this.startTime = startTime;
}
/**
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java Tue Sep 11 21:35:23 2012
@@ -23,7 +23,7 @@ public class TestNodeImpl {
mounts.add(new DiskInfo("/dev/sda", "/mnt/disk1",
"5000000", "4000000", "10%", "size"));
info.setMounts(mounts);
-
+
info.setHostName("foo");
info.setInterfaces("fip_4");
info.setArchitecture("os_arch");
@@ -32,7 +32,7 @@ public class TestNodeImpl {
NodeImpl node = new NodeImpl();
node.importNodeInfo(info);
-
+
Assert.assertEquals(info.getHostName(), node.getHostName());
Assert.assertEquals(info.getFreeMemory(), node.getAvailableMemBytes());
Assert.assertEquals(info.getMemoryTotal(), node.getTotalMemBytes());
@@ -50,7 +50,7 @@ public class TestNodeImpl {
mounts.add(new DiskInfo("/dev/sda", "/mnt/disk1",
"5000000", "4000000", "10%", "size"));
info.setMounts(mounts);
-
+
info.setHostName("foo");
info.setInterfaces("fip_4");
info.setArchitecture("os_arch");
@@ -64,7 +64,7 @@ public class TestNodeImpl {
new NodeRegistrationRequestEvent("foo", agentVersion, currentTime,
info);
node.handleEvent(e);
- Assert.assertEquals(node.getLastRegistrationTime(), currentTime);
+ Assert.assertEquals(currentTime, node.getLastRegistrationTime());
}
private void verifyNode(NodeImpl node) throws Exception {
@@ -73,7 +73,7 @@ public class TestNodeImpl {
}
private void verifyNodeState(NodeImpl node, NodeState state) {
- Assert.assertEquals(node.getState(), state);
+ Assert.assertEquals(state, node.getState());
}
private void sendHealthyHeartbeat(NodeImpl node, long counter) throws Exception {
@@ -146,61 +146,61 @@ public class TestNodeImpl {
long counter = 0;
sendHealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.HEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
sendHealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.HEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.HEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.HEALTHY,
+ node.getHealthStatus().getHealthStatus());
sendUnhealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.UNHEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNHEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNHEALTHY,
+ node.getHealthStatus().getHealthStatus());
sendUnhealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.UNHEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNHEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNHEALTHY,
+ node.getHealthStatus().getHealthStatus());
sendHealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.HEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.HEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.HEALTHY,
+ node.getHealthStatus().getHealthStatus());
timeoutNode(node);
verifyNodeState(node, NodeState.HEARTBEAT_LOST);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNKNOWN);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNKNOWN,
+ node.getHealthStatus().getHealthStatus());
timeoutNode(node);
verifyNodeState(node, NodeState.HEARTBEAT_LOST);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNKNOWN);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNKNOWN,
+ node.getHealthStatus().getHealthStatus());
sendUnhealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.UNHEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNHEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNHEALTHY,
+ node.getHealthStatus().getHealthStatus());
timeoutNode(node);
verifyNodeState(node, NodeState.HEARTBEAT_LOST);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.UNKNOWN);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.UNKNOWN,
+ node.getHealthStatus().getHealthStatus());
sendHealthyHeartbeat(node, ++counter);
verifyNodeState(node, NodeState.HEALTHY);
- Assert.assertEquals(node.getLastHeartbeatTime(), counter);
- Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
- HealthStatus.HEALTHY);
+ Assert.assertEquals(counter, node.getLastHeartbeatTime());
+ Assert.assertEquals(HealthStatus.HEALTHY,
+ node.getHealthStatus().getHealthStatus());
}
}