You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by hi...@apache.org on 2012/09/11 23:35:24 UTC

svn commit: r1383625 - in /incubator/ambari/branches/AMBARI-666: ./ ambari-server/src/main/java/org/apache/ambari/server/state/live/job/ ambari-server/src/test/java/org/apache/ambari/server/state/live/node/

Author: hitesh
Date: Tue Sep 11 21:35:23 2012
New Revision: 1383625

URL: http://svn.apache.org/viewvc?rev=1383625&view=rev
Log:
AMBARI-714. Job FSM Impl and tests. (Contributed by hitesh)

Modified:
    incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java

Modified: incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt (original)
+++ incubator/ambari/branches/AMBARI-666/AMBARI-666-CHANGES.txt Tue Sep 11 21:35:23 2012
@@ -12,6 +12,8 @@ AMBARI-666 branch (unreleased changes)
 
   NEW FEATURES
 
+  AMBARI-714. Job FSM Impl and tests. (hitesh)
+
   AMBARI-721. Remove Hardwareprofile class since its not needed anymore.
   (mahadev)
 

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java Tue Sep 11 21:35:23 2012
@@ -4,6 +4,9 @@ public class JobCompletedEvent extends J
 
   private final long completionTime;
 
+  // TODO
+  // need to add job report
+
   public JobCompletedEvent(JobId jobId, long completionTime) {
     super(JobEventType.JOB_COMPLETED, jobId);
     this.completionTime = completionTime;

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java Tue Sep 11 21:35:23 2012
@@ -5,7 +5,7 @@ public class JobFailedEvent extends JobE
   private final long completionTime;
 
   // TODO
-  // need to add job failed reason
+  // need to add job report
 
   public JobFailedEvent(JobId jobId, long completionTime) {
     super(JobEventType.JOB_FAILED, jobId);

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java Tue Sep 11 21:35:23 2012
@@ -31,5 +31,8 @@ public class JobId {
     this.jobType = jobType;
   }
 
-
+  public String toString() {
+    return "[ jobId=" + jobId
+        + ", jobType=" + jobType + "]";
+  }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java Tue Sep 11 21:35:23 2012
@@ -43,7 +43,7 @@ public class JobImpl implements Job {
   private long completionTime;
 
   // TODO
-  // need to add job failed reason
+  // need to add job report
 
   private static final StateMachineFactory
     <JobImpl, JobState, JobEventType, JobEvent>
@@ -56,6 +56,12 @@ public class JobImpl implements Job {
 
     .addTransition(JobState.INIT, JobState.IN_PROGRESS,
         JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
+    .addTransition(JobState.INIT, JobState.COMPLETED,
+        JobEventType.JOB_COMPLETED, new JobCompletedTransition())
+    .addTransition(JobState.INIT, JobState.FAILED,
+        JobEventType.JOB_FAILED, new JobFailedTransition())
+    .addTransition(JobState.INIT, JobState.IN_PROGRESS,
+        JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
     .addTransition(JobState.IN_PROGRESS, JobState.IN_PROGRESS,
         JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
     .addTransition(JobState.IN_PROGRESS, JobState.COMPLETED,
@@ -71,16 +77,28 @@ public class JobImpl implements Job {
   private final StateMachine<JobState, JobEventType, JobEvent>
       stateMachine;
 
-  public JobImpl(JobId id) {
+  public JobImpl(JobId id, long startTime) {
     super();
     this.id = id;
     this.stateMachine = stateMachineFactory.make(this);
     ReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
-    startTime = -1;
-    lastUpdateTime = -1;
-    completionTime = -1;
+    this.startTime = startTime;
+    this.lastUpdateTime = -1;
+    this.completionTime = -1;
+  }
+
+  private void reset() {
+    try {
+      writeLock.lock();
+      this.startTime = -1;
+      this.lastUpdateTime = -1;
+      this.completionTime = -1;
+    }
+    finally {
+      writeLock.unlock();
+    }
   }
 
   static class NewJobTransition
@@ -90,6 +108,7 @@ public class JobImpl implements Job {
     public void transition(JobImpl job, JobEvent event) {
       NewJobEvent e = (NewJobEvent) event;
       // TODO audit logs
+      job.reset();
       job.setId(e.getJobId());
       job.setStartTime(e.getStartTime());
       LOG.info("Launching a new Job"

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java Tue Sep 11 21:35:23 2012
@@ -2,6 +2,11 @@ package org.apache.ambari.server.state.l
 
 public class JobType {
 
-  public String jobName;
+  public final String jobName;
+
+  public JobType(String jobName) {
+    super();
+    this.jobName = jobName;
+  }
 
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java Tue Sep 11 21:35:23 2012
@@ -4,9 +4,9 @@ public class NewJobEvent extends JobEven
 
   private final long startTime;
 
-  public NewJobEvent(JobId jobId, long creationTime) {
+  public NewJobEvent(JobId jobId, long startTime) {
     super(JobEventType.JOB_INIT, jobId);
-    this.startTime = creationTime;
+    this.startTime = startTime;
   }
 
   /**

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java?rev=1383625&r1=1383624&r2=1383625&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java Tue Sep 11 21:35:23 2012
@@ -23,7 +23,7 @@ public class TestNodeImpl {
     mounts.add(new DiskInfo("/dev/sda", "/mnt/disk1",
         "5000000", "4000000", "10%", "size"));
     info.setMounts(mounts);
-    
+
     info.setHostName("foo");
     info.setInterfaces("fip_4");
     info.setArchitecture("os_arch");
@@ -32,7 +32,7 @@ public class TestNodeImpl {
 
     NodeImpl node = new NodeImpl();
     node.importNodeInfo(info);
-   
+
     Assert.assertEquals(info.getHostName(), node.getHostName());
     Assert.assertEquals(info.getFreeMemory(), node.getAvailableMemBytes());
     Assert.assertEquals(info.getMemoryTotal(), node.getTotalMemBytes());
@@ -50,7 +50,7 @@ public class TestNodeImpl {
     mounts.add(new DiskInfo("/dev/sda", "/mnt/disk1",
         "5000000", "4000000", "10%", "size"));
     info.setMounts(mounts);
-    
+
     info.setHostName("foo");
     info.setInterfaces("fip_4");
     info.setArchitecture("os_arch");
@@ -64,7 +64,7 @@ public class TestNodeImpl {
         new NodeRegistrationRequestEvent("foo", agentVersion, currentTime,
             info);
     node.handleEvent(e);
-    Assert.assertEquals(node.getLastRegistrationTime(), currentTime);
+    Assert.assertEquals(currentTime, node.getLastRegistrationTime());
   }
 
   private void verifyNode(NodeImpl node) throws Exception {
@@ -73,7 +73,7 @@ public class TestNodeImpl {
   }
 
   private void verifyNodeState(NodeImpl node, NodeState state) {
-    Assert.assertEquals(node.getState(), state);
+    Assert.assertEquals(state, node.getState());
   }
 
   private void sendHealthyHeartbeat(NodeImpl node, long counter) throws Exception {
@@ -146,61 +146,61 @@ public class TestNodeImpl {
     long counter = 0;
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
 
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.HEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.HEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
     sendUnhealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.UNHEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNHEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNHEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
     sendUnhealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.UNHEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNHEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNHEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.HEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.HEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
     timeoutNode(node);
     verifyNodeState(node, NodeState.HEARTBEAT_LOST);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNKNOWN);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNKNOWN,
+        node.getHealthStatus().getHealthStatus());
 
     timeoutNode(node);
     verifyNodeState(node, NodeState.HEARTBEAT_LOST);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNKNOWN);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNKNOWN,
+        node.getHealthStatus().getHealthStatus());
 
     sendUnhealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.UNHEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNHEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNHEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
     timeoutNode(node);
     verifyNodeState(node, NodeState.HEARTBEAT_LOST);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.UNKNOWN);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.UNKNOWN,
+        node.getHealthStatus().getHealthStatus());
 
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
-        HealthStatus.HEALTHY);
+    Assert.assertEquals(counter, node.getLastHeartbeatTime());
+    Assert.assertEquals(HealthStatus.HEALTHY,
+        node.getHealthStatus().getHealthStatus());
 
   }
 }