You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by vd...@apache.org on 2016/07/01 18:26:47 UTC
incubator-quarks git commit: QUARKS-215 Job control gets unregistered
from ControlService - closes #153
Repository: incubator-quarks
Updated Branches:
refs/heads/master 172b1fc50 -> 31cd66583
QUARKS-215 Job control gets unregistered from ControlService - closes #153
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/31cd6658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/31cd6658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/31cd6658
Branch: refs/heads/master
Commit: 31cd66583ca30670dbe51e1f6f40b1643429ec21
Parents: 172b1fc
Author: Victor Dogaru <vd...@apache.org>
Authored: Fri Jul 1 11:25:19 2016 -0700
Committer: Victor Dogaru <vd...@apache.org>
Committed: Fri Jul 1 11:25:19 2016 -0700
----------------------------------------------------------------------
.../execution/services/ControlService.java | 14 ++
.../quarks/execution/services/Controls.java | 9 +-
.../java/quarks/apps/runtime/JobMonitorApp.java | 47 ++++++-
.../java/quarks/runtime/etiao/EtiaoJob.java | 20 ++-
.../runtime/etiao/mbeans/EtiaoJobBean.java | 133 ++++++++++++++++++-
.../runtime/jmxcontrol/JMXControlService.java | 27 +++-
.../runtime/jsoncontrol/JsonControlService.java | 18 ++-
.../quarks/test/fvt/iot/IotProviderTest.java | 18 ++-
8 files changed, 256 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/api/execution/src/main/java/quarks/execution/services/ControlService.java
----------------------------------------------------------------------
diff --git a/api/execution/src/main/java/quarks/execution/services/ControlService.java b/api/execution/src/main/java/quarks/execution/services/ControlService.java
index 64d6bd7..698c42e 100644
--- a/api/execution/src/main/java/quarks/execution/services/ControlService.java
+++ b/api/execution/src/main/java/quarks/execution/services/ControlService.java
@@ -106,4 +106,18 @@ public interface ControlService {
* @return Control Mbean or null if a matching MBean is not registered.
*/
<T> T getControl(String type, String alias, Class<T> controlInterface);
+
+ /**
+ * Return the unique identifier for a control Mbean registered with
+ * this service.
+ *
+ * @param <T> Control MBean type
+ * @param type Type of the control MBean.
+ * @param alias Alias for the control MBean.
+ * @param controlInterface
+ * Public interface of the control MBean.
+ * @return unique identifier that can be used to unregister a control
+ * MBean or null if a matching MBean is not registered.
+ */
+ <T> String getControlId(String type, String alias, Class<T> controlInterface);
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/api/execution/src/main/java/quarks/execution/services/Controls.java
----------------------------------------------------------------------
diff --git a/api/execution/src/main/java/quarks/execution/services/Controls.java b/api/execution/src/main/java/quarks/execution/services/Controls.java
index 7629b8b..62bbea0 100644
--- a/api/execution/src/main/java/quarks/execution/services/Controls.java
+++ b/api/execution/src/main/java/quarks/execution/services/Controls.java
@@ -26,7 +26,14 @@ import java.lang.reflect.Method;
* @see ControlService
*/
public class Controls {
-
+ /**
+ * Number of seconds a {@link quarks.execution.mbeans.JobMXBean JobMXBean}
+ * control is held registered with the {@link ControlService} after a job
+ * gets closed. After this period the bean gets unregistered from the
+ * service.
+ */
+ public final static int JOB_HOLD_AFTER_CLOSE_SECS = 10;
+
/**
* Test to see if an interface represents a valid
* control service MBean.
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
index 4bec9d1..39d67c6 100644
--- a/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
+++ b/apps/runtime/src/main/java/quarks/apps/runtime/JobMonitorApp.java
@@ -30,7 +30,10 @@ import com.google.gson.JsonObject;
import quarks.execution.DirectSubmitter;
import quarks.execution.Job;
+import quarks.execution.Job.Action;
+import quarks.execution.mbeans.JobMXBean;
import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
import quarks.execution.services.JobRegistryService;
import quarks.execution.services.RuntimeServices;
import quarks.function.Consumer;
@@ -129,6 +132,7 @@ public class JobMonitorApp {
ApplicationServiceMXBean.class.getName());
}
// TODO add ability to submit with the initial application configuration
+ logger.info("Restarting monitored application {}", applicationName);
control.submit(applicationName, null);
}
catch (Exception e) {
@@ -137,6 +141,47 @@ public class JobMonitorApp {
}
/**
+ * Closes a job using a {@code JobMXBean} control registered with the
+ * specified {@code ControlService}.
+ *
+ * @param jobName the name of the job
+ * @param controlService the control service
+ */
+ public static void closeJob(String jobName, ControlService controlService) {
+ try {
+ JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
+ if (jobMbean == null) {
+ throw new IllegalStateException(
+ "Could not find a registered control for job " + jobName +
+ " with the following interface: " + JobMXBean.class.getName());
+ }
+ jobMbean.stateChange(Action.CLOSE);
+ logger.debug("Closing job {}", jobName);
+
+ // Wait for the job to complete
+ long startWaiting = System.currentTimeMillis();
+ for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
+ waitForMillis < 0;
+ waitForMillis -= 100) {
+ if (jobMbean.getCurrentState() == Job.State.CLOSED)
+ break;
+ else
+ Thread.sleep(100);
+ }
+ if (jobMbean.getCurrentState() != Job.State.CLOSED) {
+ throw new IllegalStateException(
+ "The unhealthy job " + jobName + " did not close after " +
+ Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");
+ }
+ logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
+ jobName, System.currentTimeMillis() - startWaiting);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
* Declares the following topology:
* <pre>
* JobEvents source --> Filter (health == unhealthy) --> Restart application
@@ -201,7 +246,7 @@ public class JobMonitorApp {
JsonObject job = JobMonitorAppEvent.getJob(value);
String applicationName = JobMonitorAppEvent.getJobName(job);
- logger.info("Will restart monitored application {}, cause: {}", applicationName, value);
+ closeJob(applicationName, controlService);
submitApplication(applicationName, controlService);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java b/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
index f106c45..c1fcbd9 100644
--- a/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
+++ b/runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import quarks.execution.mbeans.JobMXBean;
import quarks.execution.services.ControlService;
import quarks.execution.services.JobRegistryService;
import quarks.execution.services.ServiceContainer;
@@ -67,9 +66,9 @@ public class EtiaoJob extends AbstractGraphJob implements JobContext {
this.containerServices = container;
ControlService cs = container.getService(ControlService.class);
- if (cs != null)
- cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
-
+ if (cs != null) {
+ EtiaoJobBean.registerControl(cs, this);
+ }
this.jobs = container.getService(JobRegistryService.class);
if (jobs != null)
jobs.addJob(this);
@@ -181,6 +180,19 @@ public class EtiaoJob extends AbstractGraphJob implements JobContext {
}
}
+ /**
+ * Complete job closing. This method can be invoked after a job close
+ * has been initiated.
+ */
+ public void completeClosing(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
+ if (unit == null)
+ throw new NullPointerException();
+ if (getCurrentState() != State.CLOSED &&
+ !awaitComplete(unit.toMillis(timeout))) {
+ throw new TimeoutException();
+ }
+ }
+
private boolean awaitComplete(long millis) throws ExecutionException, InterruptedException {
try {
return executable().complete(millis);
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java b/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
index 2fa89cf..9b9ced8 100644
--- a/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
+++ b/runtime/etiao/src/main/java/quarks/runtime/etiao/mbeans/EtiaoJobBean.java
@@ -18,24 +18,62 @@ under the License.
*/
package quarks.runtime.etiao.mbeans;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import quarks.execution.Job;
import quarks.execution.Job.Action;
import quarks.execution.mbeans.JobMXBean;
+import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
import quarks.runtime.etiao.EtiaoJob;
import quarks.runtime.etiao.graph.model.GraphType;
/**
- * Implementation of a JMX control interface for the {@code EtiaoJob}.
+ * Implementation of a control interface for the {@code EtiaoJob}.
*/
public class EtiaoJobBean implements JobMXBean {
private final EtiaoJob job;
- public EtiaoJobBean(EtiaoJob job) {
+ private ControlService controlService;
+ private String controlId;
+ private static final Logger logger = LoggerFactory.getLogger(EtiaoJobBean.class);
+
+ /**
+ * Factory method which creates an {@code EtiaoJobBean} instance to
+ * control the specified {@code EtiaoJob} and registers it with the
+ * specified {@code ControlService}.
+ *
+ * @param cs the control service
+ * @param job the controlled job
+ * @return a registered bean instance
+ */
+ public static EtiaoJobBean registerControl(ControlService cs, EtiaoJob job) {
+ EtiaoJobBean bean = new EtiaoJobBean(job);
+ bean.registerControl(cs);
+ return bean;
+ }
+
+ private EtiaoJobBean(EtiaoJob job) {
this.job = job;
}
+ public String getControlId() {
+ return controlId;
+ }
+
+ public boolean wasRegistered() {
+ return controlId != null;
+ }
+
@Override
public String getId() {
return job.getId();
@@ -75,5 +113,96 @@ public class EtiaoJobBean implements JobMXBean {
@Override
public void stateChange(Action action) {
job.stateChange(action);
+ if (wasRegistered() && action == Action.CLOSE) {
+ unregisterControlAsync();
+ }
+ }
+
+ private void registerControl(ControlService cs) {
+ if (cs == null)
+ throw new IllegalArgumentException("ControlService must not be null");
+
+ logger.trace("Registering control for job id {}, job name {}", job.getId(), job.getName());
+
+ this.controlService = cs;
+ JobMXBean oldControl = cs.getControl(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+
+ if (oldControl != null) {
+ String oldControlId = cs.getControlId(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+ if (oldControlId != null) {
+ if (isJobClosed(oldControl)) {
+ cs.unregister(oldControlId);
+ logger.debug("Old control id {} for CLOSED job name {} was unregistered",
+ oldControlId, job.getName());
+ }
+ else {
+ throw new IllegalStateException(
+ "Cannot register job control for alias " +
+ job.getName() + " because a job control with id " + oldControlId +
+ " for the same alias already exists and is not CLOSED");
+ }
+ }
+ }
+ this.controlId = cs.registerControl(JobMXBean.TYPE, job.getId(),
+ job.getName(), JobMXBean.class, this);
+ logger.debug("Control for job id {}, job name {} was registered with id {}",
+ job.getId(), job.getName(), controlId);
+ }
+
+ private void unregisterControlAsync() {
+ if (controlService == null)
+ throw new IllegalStateException(
+ "The ControlService of a registered bean must not be null");
+
+ getThread(new Runnable() {
+ @Override
+ public void run() {
+ unregisterControl();
+ }
+ }).start();
+ }
+
+ private void unregisterControl() {
+ if (!wasRegistered())
+ return;
+
+ long startTime = System.currentTimeMillis();
+ try {
+ try {
+ job.completeClosing(Controls.JOB_HOLD_AFTER_CLOSE_SECS, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ String cause = e.getCause() != null ? e.getCause().getMessage() : "unknown";
+ logger.info("Error {} during completion of job {} caused by {}",
+ e.getMessage(), job.getName(), cause);
+ logger.debug("Error during completion of job " + job.getName(), e);
+ } catch (TimeoutException e) {
+ logger.info("Timed out after {} milliseconds waiting for job {} to complete",
+ (System.currentTimeMillis() - startTime), job.getName());
+ }
+ long remaining = startTime + Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000 - System.currentTimeMillis();
+ if (remaining < 0)
+ remaining = 0;
+ else
+ logger.trace("Job completed, waiting {} milliseconds before unregistering control {}",
+ remaining, controlId);
+
+ Thread.sleep(remaining);
+ } catch (InterruptedException e) {
+ // Swallow the exception and unregister the control
+ }
+ finally {
+ controlService.unregister(controlId);
+ logger.trace("Control {} unregistered", controlId);
+ }
+ }
+
+ private Thread getThread(Runnable r) {
+ ThreadFactory threads = Executors.defaultThreadFactory();
+ return threads.newThread(r);
+ }
+
+ private boolean isJobClosed(JobMXBean job) {
+ return job.getCurrentState() == Job.State.CLOSED &&
+ job.getNextState() == Job.State.CLOSED;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
index b03a3d9..0ad3092 100644
--- a/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
+++ b/runtime/jmxcontrol/src/main/java/quarks/runtime/jmxcontrol/JMXControlService.java
@@ -104,7 +104,7 @@ public class JMXControlService implements ControlService {
ObjectName on = ObjectName.getInstance(getDomain(), table);
getMbs().registerMBean(control, on);
- return on.getCanonicalName();
+ return getControlId(on);
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
| MalformedObjectNameException e) {
throw new RuntimeException(e);
@@ -128,28 +128,41 @@ public class JMXControlService implements ControlService {
@Override
public <T> T getControl(String type, String alias, Class<T> controlInterface) {
+ MBeanServer mBeanServer = getMbs();
+ ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
+ return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
+ }
+
+ @Override
+ public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+ return getControlId(getObjectNameForInterface(type, alias, controlInterface));
+ }
+
+ private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
try {
- MBeanServer mBeanServer = getMbs();
Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
if (names.isEmpty())
return null;
if (names.size() != 1)
throw new RuntimeException("Alias " + alias + " not unique for type " + type);
-
- T control = null;
-
+
+ ObjectName name = null;
for (ObjectName on : names) {
- control = JMX.newMXBeanProxy(mBeanServer, on, controlInterface);
+ name = on;
break;
}
- return control;
+ return name;
}
catch (MalformedObjectNameException e) {
throw new RuntimeException(e);
}
}
+ private static String getControlId(ObjectName on) {
+ return on != null ? on.getCanonicalName() : null;
+ }
+
private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName)
throws MalformedObjectNameException {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java b/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
index 9388f85..9b25778 100644
--- a/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
+++ b/runtime/jsoncontrol/src/main/java/quarks/runtime/jsoncontrol/JsonControlService.java
@@ -244,16 +244,24 @@ public class JsonControlService implements ControlService {
}
@Override
- public synchronized <T> T getControl(String type, String alias, Class<T> controlInterface) {
+ public <T> T getControl(String type, String alias, Class<T> controlInterface) {
String controlId = getControlId(type, null, alias);
-
+ ControlMBean<?> bean = getControlMBean(controlId, controlInterface);
+ return bean != null ? controlInterface.cast(bean.getControl()) : null;
+ }
+
+ @Override
+ public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+ String controlId = getControlId(type, null, alias);
+ return getControlMBean(controlId, controlInterface) != null ? controlId : null;
+ }
+
+ private synchronized <T> ControlMBean<?> getControlMBean(String controlId, Class<T> controlInterface) {
ControlMBean<?> bean = mbeans.get(controlId);
if (bean == null)
return null;
-
if (bean.getControlInterface() != controlInterface)
return null;
-
- return controlInterface.cast(bean.getControl());
+ return bean;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/31cd6658/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
index 2a7e261..6a1460b 100644
--- a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
@@ -19,6 +19,7 @@ under the License.
package quarks.test.fvt.iot;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -42,6 +43,7 @@ import quarks.execution.Job;
import quarks.execution.Job.Action;
import quarks.execution.mbeans.JobMXBean;
import quarks.execution.services.ControlService;
+import quarks.execution.services.Controls;
import quarks.providers.iot.IotProvider;
import quarks.runtime.jsoncontrol.JsonControlService;
import quarks.test.apps.iot.EchoIotDevice;
@@ -157,7 +159,7 @@ public class IotProviderTest {
jsc.controlRequest(closeJob);
- // await for the job to complete
+ // Wait for the job to complete
for (int i = 0; i < 30; i++) {
Thread.sleep(100);
if (jobMbean.getCurrentState() == Job.State.CLOSED)
@@ -165,14 +167,10 @@ public class IotProviderTest {
}
assertEquals(Job.State.CLOSED, jobMbean.getCurrentState());
- // await for the associated control to be released
-// for (int i = 0; i < 30; i++) {
-// Thread.sleep(100);
-// jobMbean = cs.getControl(JobMXBean.TYPE, "AppOne", JobMXBean.class);
-// if (jobMbean == null)
-// break;
-// }
-// assertNull(jobMbean);
-// appStarter.stateChange(Action.CLOSE);
+ // Wait for the associated control to be released
+ Thread.sleep(1000 * (Controls.JOB_HOLD_AFTER_CLOSE_SECS + 1));
+ jobMbean = cs.getControl(JobMXBean.TYPE, "AppOne", JobMXBean.class);
+ assertNull(jobMbean);
+ appStarter.stateChange(Action.CLOSE);
}
}