You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by vd...@apache.org on 2016/07/01 18:26:47 UTC

incubator-quarks git commit: QUARKS-215 Job control gets unregistered from ControlService - closes #153

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 172b1fc50 -> 31cd66583


QUARKS-215 Job control gets unregistered from ControlService - closes #153


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/31cd6658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/31cd6658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/31cd6658

Branch: refs/heads/master
Commit: 31cd66583ca30670dbe51e1f6f40b1643429ec21
Parents: 172b1fc
Author: Victor Dogaru <vd...@apache.org>
Authored: Fri Jul 1 11:25:19 2016 -0700
Committer: Victor Dogaru <vd...@apache.org>
Committed: Fri Jul 1 11:25:19 2016 -0700

----------------------------------------------------------------------
 .../execution/services/ControlService.java      |  14 ++
 .../quarks/execution/services/Controls.java     |   9 +-
 .../java/quarks/apps/runtime/JobMonitorApp.java |  47 ++++++-
 .../java/quarks/runtime/etiao/EtiaoJob.java     |  20 ++-
 .../runtime/etiao/mbeans/EtiaoJobBean.java      | 133 ++++++++++++++++++-
 .../runtime/jmxcontrol/JMXControlService.java   |  27 +++-
 .../runtime/jsoncontrol/JsonControlService.java |  18 ++-
 .../quarks/test/fvt/iot/IotProviderTest.java    |  18 ++-
 8 files changed, 256 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/api/execution/src/main/java/quarks/execution/services/ControlService.java
----------------------------------------------------------------------
diff --git a/api/execution/src/main/java/quarks/execution/services/ControlService.java b/api/execution/src/main/java/quarks/execution/services/ControlService.java
index 64d6bd7..698c42e 100644
--- a/api/execution/src/main/java/quarks/execution/services/ControlService.java
+++ b/api/execution/src/main/java/quarks/execution/services/ControlService.java
@@ -106,4 +106,18 @@ public interface ControlService {
      * @return Control Mbean or null if a matching MBean is not registered.
      */
     <T> T getControl(String type, String alias, Class<T> controlInterface);
+
+    /**
+     * Return the unique identifier for a control Mbean registered with 
+     * this service.
+     * 
+     * @param <T> Control MBean type
+     * @param type Type of the control MBean.
+     * @param alias Alias for the control MBean.
+     * @param controlInterface
+     *              Public interface of the control MBean. 
+     * @return unique identifier that can be used to unregister a control 
+     *              MBean or null if a matching MBean is not registered.
+     */
+    <T> String getControlId(String type, String alias, Class<T> controlInterface);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/api/execution/src/main/java/quarks/execution/services/Controls.java
----------------------------------------------------------------------
diff --git a/api/execution/src/main/java/quarks/execution/services/Controls.java b/api/execution/src/main/java/quarks/execution/services/Controls.java
index 7629b8b..62bbea0 100644
--- a/api/execution/src/main/java/quarks/execution/services/Controls.java
+++ b/api/execution/src/main/java/quarks/execution/services/Controls.java
@@ -26,7 +26,14 @@ import java.lang.reflect.Method;
  * @see ControlService
  */
 public class Controls {
-    
+    /**
+     * Number of seconds a {@link quarks.execution.mbeans.JobMXBean JobMXBean}
+     * control is held registered with the {@link ControlService} after a job
+     * gets closed.  After this period the bean gets unregistered from the 
+     * service.
+     */
+    public final static int JOB_HOLD_AFTER_CLOSE_SECS = 10; 
+
     /**
      * Test to see if an interface represents a valid
      * control service MBean.

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
index 4bec9d1..39d67c6 100644
--- a/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
+++ b/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
@@ -30,7 +30,10 @@ import com.google.gson.JsonObject;
 
 import quarks.execution.DirectSubmitter;
 import quarks.execution.Job;
+import quarks.execution.Job.Action;
+import quarks.execution.mbeans.JobMXBean;
 import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
 import quarks.execution.services.JobRegistryService;
 import quarks.execution.services.RuntimeServices;
 import quarks.function.Consumer;
@@ -129,6 +132,7 @@ public class JobMonitorApp {
                         ApplicationServiceMXBean.class.getName());                
             }
 // TODO add ability to submit with the initial application configuration
+            logger.info("Restarting monitored application {}", applicationName);
             control.submit(applicationName, null);
         }
         catch (Exception e) {
@@ -137,6 +141,47 @@ public class JobMonitorApp {
     }
     
     /**
+     * Closes a job using a {@code JobMXBean} control registered with the 
+     * specified {@code ControlService}.
+     * 
+     * @param jobName the name of the job
+     * @param controlService the control service
+     */
+    public static void closeJob(String jobName, ControlService controlService) {
+        try {
+            JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
+            if (jobMbean == null) {
+                throw new IllegalStateException(
+                        "Could not find a registered control for job " + jobName + 
+                        " with the following interface: " + JobMXBean.class.getName());                
+            }
+            jobMbean.stateChange(Action.CLOSE);
+            logger.debug("Closing job {}", jobName);
+            
+            // Wait for the job to complete
+            long startWaiting = System.currentTimeMillis();
+            for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
+                    waitForMillis < 0;
+                    waitForMillis -= 100) {
+                if (jobMbean.getCurrentState() == Job.State.CLOSED)
+                    break;
+                else
+                    Thread.sleep(100);
+            }
+            if (jobMbean.getCurrentState() != Job.State.CLOSED) {
+                throw new IllegalStateException(
+                        "The unhealthy job " + jobName + " did not close after " + 
+                        Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");                
+            }
+            logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
+                    jobName, System.currentTimeMillis() - startWaiting);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
      * Declares the following topology:
      * <pre>
      * JobEvents source --&gt; Filter (health == unhealthy) --&gt; Restart application
@@ -201,7 +246,7 @@ public class JobMonitorApp {
             JsonObject job = JobMonitorAppEvent.getJob(value);
             String applicationName = JobMonitorAppEvent.getJobName(job);
 
-            logger.info("Will restart monitored application {}, cause: {}", applicationName, value);
+            closeJob(applicationName, controlService);
             submitApplication(applicationName, controlService);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java b/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
index f106c45..c1fcbd9 100644
--- a/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
+++ b/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import quarks.execution.mbeans.JobMXBean;
 import quarks.execution.services.ControlService;
 import quarks.execution.services.JobRegistryService;
 import quarks.execution.services.ServiceContainer;
@@ -67,9 +66,9 @@ public class EtiaoJob extends AbstractGraphJob implements JobContext {
         this.containerServices = container;
 
         ControlService cs = container.getService(ControlService.class);
-        if (cs != null)
-            cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
-        
+        if (cs != null) {
+            EtiaoJobBean.registerControl(cs, this);
+        }            
         this.jobs = container.getService(JobRegistryService.class);
         if (jobs != null)
             jobs.addJob(this);
@@ -181,6 +180,19 @@ public class EtiaoJob extends AbstractGraphJob implements JobContext {
         }
     }
 
+    /**
+     * Complete job closing.  This method can be invoked after a job close 
+     * has been initiated.  
+     */
+    public void completeClosing(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
+        if (unit == null)
+            throw new NullPointerException();
+        if (getCurrentState() != State.CLOSED &&
+                !awaitComplete(unit.toMillis(timeout))) {
+            throw new TimeoutException();
+        }
+    }
+
     private boolean awaitComplete(long millis) throws ExecutionException, InterruptedException {
         try {
             return executable().complete(millis);

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java b/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
index 2fa89cf..9b9ced8 100644
--- a/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
+++ b/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
@@ -18,24 +18,62 @@ under the License.
 */
 package quarks.runtime.etiao.mbeans;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
 import quarks.execution.Job;
 import quarks.execution.Job.Action;
 import quarks.execution.mbeans.JobMXBean;
+import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
 import quarks.runtime.etiao.EtiaoJob;
 import quarks.runtime.etiao.graph.model.GraphType;
 
 /**
- * Implementation of a JMX control interface for the {@code EtiaoJob}.
+ * Implementation of a control interface for the {@code EtiaoJob}.
  */
 public class EtiaoJobBean implements JobMXBean {
     private final EtiaoJob job;
-    public EtiaoJobBean(EtiaoJob job) {
+    private ControlService controlService;
+    private String controlId;
+    private static final Logger logger = LoggerFactory.getLogger(EtiaoJobBean.class);
+    
+    /**
+     * Factory method which creates an {@code EtiaoJobBean} instance to
+     * control the specified {@code EtiaoJob} and registers it with the 
+     * specified {@code ControlService}.
+     * 
+     * @param cs the control service
+     * @param job the controlled job
+     * @return a registered bean instance
+     */
+    public static EtiaoJobBean registerControl(ControlService cs, EtiaoJob job) {
+        EtiaoJobBean bean = new EtiaoJobBean(job);
+        bean.registerControl(cs);
+        return bean;        
+    }
+
+    private EtiaoJobBean(EtiaoJob job) {
         this.job = job;
     }
 
+    public String getControlId() {
+        return controlId;
+    }
+
+    public boolean wasRegistered() {
+        return controlId != null;
+    }
+
     @Override
     public String getId() {
         return job.getId();
@@ -75,5 +113,96 @@ public class EtiaoJobBean implements JobMXBean {
     @Override
     public void stateChange(Action action) {
         job.stateChange(action);
+        if (wasRegistered() && action == Action.CLOSE) {
+            unregisterControlAsync();
+        }
+    }
+
+    private void registerControl(ControlService cs) {
+        if (cs == null)
+            throw new IllegalArgumentException("ControlService must not be null");
+
+        logger.trace("Registering control for job id {}, job name {}", job.getId(), job.getName());
+
+        this.controlService = cs;
+        JobMXBean oldControl = cs.getControl(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+
+        if (oldControl != null) {
+            String oldControlId = cs.getControlId(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+            if (oldControlId != null) {
+                if (isJobClosed(oldControl)) {
+                    cs.unregister(oldControlId);
+                    logger.debug("Old control id {} for CLOSED job name {} was unregistered", 
+                            oldControlId, job.getName());
+                }
+                else {
+                    throw new IllegalStateException(
+                        "Cannot register job control for alias " + 
+                        job.getName() + " because a job control with id " + oldControlId + 
+                        " for the same alias already exists and is not CLOSED");
+                }
+            }
+        }
+        this.controlId = cs.registerControl(JobMXBean.TYPE, job.getId(), 
+                job.getName(), JobMXBean.class, this);
+        logger.debug("Control for job id {}, job name {} was registered with id {}", 
+                job.getId(), job.getName(), controlId);
+    }
+
+    private void unregisterControlAsync() {
+        if (controlService == null)
+            throw new IllegalStateException(
+                    "The ControlService of a registered bean must not be null");
+
+        getThread(new Runnable() {
+            @Override
+            public void run() {
+                unregisterControl();
+            }
+        }).start();
+    }
+
+    private void unregisterControl() {
+        if (!wasRegistered())
+            return;
+
+        long startTime = System.currentTimeMillis();
+        try {
+            try {
+                job.completeClosing(Controls.JOB_HOLD_AFTER_CLOSE_SECS, TimeUnit.SECONDS);
+            } catch (ExecutionException e) {
+                String cause = e.getCause() != null ? e.getCause().getMessage() : "unknown";
+                logger.info("Error {} during completion of job {} caused by {}", 
+                        e.getMessage(), job.getName(), cause);
+                logger.debug("Error during completion of job " + job.getName(), e);
+            } catch (TimeoutException e) {
+                logger.info("Timed out after {} milliseconds waiting for job {} to complete", 
+                        (System.currentTimeMillis() - startTime), job.getName());                        
+            }
+            long remaining = startTime + Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000 - System.currentTimeMillis();
+            if (remaining < 0)
+                remaining = 0;
+            else
+                logger.trace("Job completed, waiting {} milliseconds before unregistering control {}", 
+                        remaining, controlId);
+
+            Thread.sleep(remaining);
+        } catch (InterruptedException e) {
+            // Swallow the exception and unregister the control 
+        }
+        finally {
+            controlService.unregister(controlId);
+            logger.trace("Control {} unregistered", controlId);
+        }
+    }
+
+    private Thread getThread(Runnable r) {
+        ThreadFactory threads = Executors.defaultThreadFactory();
+        return threads.newThread(r);
+    }
+    
+    private boolean isJobClosed(JobMXBean job) {
+        return job.getCurrentState() == Job.State.CLOSED &&
+                job.getNextState() == Job.State.CLOSED;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
index b03a3d9..0ad3092 100644
--- a/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
+++ b/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
@@ -104,7 +104,7 @@ public class JMXControlService implements ControlService {
             ObjectName on = ObjectName.getInstance(getDomain(), table);
             getMbs().registerMBean(control, on);
 
-            return on.getCanonicalName();
+            return getControlId(on);
         } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
                 | MalformedObjectNameException e) {
             throw new RuntimeException(e);
@@ -128,28 +128,41 @@ public class JMXControlService implements ControlService {
 
     @Override
     public <T> T  getControl(String type, String alias, Class<T> controlInterface) {
+        MBeanServer mBeanServer = getMbs();
+        ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
+        return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
+    }
+
+    @Override
+    public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+        return getControlId(getObjectNameForInterface(type, alias, controlInterface));
+    }
+
+    private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
         try {
-            MBeanServer mBeanServer = getMbs();
             Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
             
             if (names.isEmpty())
                 return null;
             if (names.size() != 1)
                 throw new RuntimeException("Alias " + alias + " not unique for type " + type);
-            
-            T control = null;
-            
+    
+            ObjectName name = null;
             for (ObjectName on : names) {
-                control = JMX.newMXBeanProxy(mBeanServer, on, controlInterface);
+                name = on;
                 break;
             }
-            return control;
+            return name;
         }
         catch (MalformedObjectNameException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private static String getControlId(ObjectName on) {
+        return on != null ? on.getCanonicalName() : null;
+    }
+
     private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName) 
             throws MalformedObjectNameException {
         

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java b/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
index 9388f85..9b25778 100644
--- a/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
+++ b/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
@@ -244,16 +244,24 @@ public class JsonControlService implements ControlService {
     }
 
     @Override
-    public synchronized <T> T getControl(String type, String alias, Class<T> controlInterface) {
+    public <T> T getControl(String type, String alias, Class<T> controlInterface) {
         String controlId = getControlId(type, null, alias);
-        
+        ControlMBean<?> bean = getControlMBean(controlId, controlInterface);        
+        return bean != null ? controlInterface.cast(bean.getControl()) : null;
+    }
+
+    @Override
+    public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+        String controlId = getControlId(type, null, alias);
+        return getControlMBean(controlId, controlInterface) != null ? controlId : null;
+    }
+
+    private synchronized <T> ControlMBean<?> getControlMBean(String controlId, Class<T> controlInterface) {
         ControlMBean<?> bean = mbeans.get(controlId);
         if (bean == null)
             return null;
-        
         if (bean.getControlInterface() != controlInterface)
             return null;
-        
-        return controlInterface.cast(bean.getControl());
+        return bean;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
index 2a7e261..6a1460b 100644
--- a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
@@ -19,6 +19,7 @@ under the License.
 package quarks.test.fvt.iot;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -42,6 +43,7 @@ import quarks.execution.Job;
 import quarks.execution.Job.Action;
 import quarks.execution.mbeans.JobMXBean;
 import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
 import quarks.providers.iot.IotProvider;
 import quarks.runtime.jsoncontrol.JsonControlService;
 import quarks.test.apps.iot.EchoIotDevice;
@@ -157,7 +159,7 @@ public class IotProviderTest {
 
         jsc.controlRequest(closeJob);
 
-        // await for the job to complete
+        // Wait for the job to complete
         for (int i = 0; i < 30; i++) {
             Thread.sleep(100);
             if (jobMbean.getCurrentState() == Job.State.CLOSED)
@@ -165,14 +167,10 @@ public class IotProviderTest {
         }
         assertEquals(Job.State.CLOSED, jobMbean.getCurrentState());
 
-        // await for the associated control to be released
-//        for (int i = 0; i < 30; i++) {
-//            Thread.sleep(100);
-//            jobMbean = cs.getControl(JobMXBean.TYPE, "AppOne", JobMXBean.class);
-//            if (jobMbean == null)
-//                break;
-//        }
-//        assertNull(jobMbean);
-//        appStarter.stateChange(Action.CLOSE);
+        // Wait for the associated control to be released
+        Thread.sleep(1000 * (Controls.JOB_HOLD_AFTER_CLOSE_SECS + 1));
+        jobMbean = cs.getControl(JobMXBean.TYPE, "AppOne", JobMXBean.class);
+        assertNull(jobMbean);
+        appStarter.stateChange(Action.CLOSE);
     }
 }