You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@edgent.apache.org by vdogaru <gi...@git.apache.org> on 2016/03/31 02:54:33 UTC

[GitHub] incubator-quarks pull request: Quarks 66 vdogaru

GitHub user vdogaru opened a pull request:

    https://github.com/apache/incubator-quarks/pull/59

    Quarks 66 vdogaru

    Monitor application listens on JobRegistry events and resubmits unhealthy jobs. The monitored applications must be registered with an ApplicationService prior to submission, otherwise the monitor application cannot restart them. Packaged in new module quarks.apps.runtime.jar.
    
    The monitor application depends on the following service changes:
    - ApplicationService provides the names of registered applications
    - ControlService.queryInterfaces returns the registered controls which implement a specified interface.  Used by the monitor application to get ApplicationServiceMXBean control in order to restart failed applications.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vdogaru/incubator-quarks QUARKS-66-vdogaru

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-quarks/pull/59.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #59
    
----
commit da8b8a3bd14bf710638583e14a6155147ce7324f
Author: Victor Dogaru <vd...@apache.org>
Date:   2016-03-30T23:24:01Z

    Add ControlService.queryInterfaces()

commit aef8083c275d87559ee3b41ca68cc75c49d34a49
Author: Victor Dogaru <vd...@apache.org>
Date:   2016-03-30T23:25:07Z

    Add ApplicationService.getApplicationNames()

commit da41ef6ad17f91b38dffcb2d802576ab4b2ef067
Author: Victor Dogaru <vd...@apache.org>
Date:   2016-03-31T00:47:56Z

    MonitorApp and unit test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58106043
  
    --- Diff: apps/runtime/build.xml ---
    @@ -0,0 +1,29 @@
    +<project name="quarks.apps.runtime" default="all" 
    --- End diff --
    
    Missing ASF licence header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58215570
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    --- End diff --
    
    Just trying to understand / imagine when there might be more than one ApplicationServiceMXBean registered and then what it might mean to submit the same applicationName multiple times once for each AS control? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58244003
  
    --- Diff: api/topology/src/main/java/quarks/topology/services/ApplicationService.java ---
    @@ -63,4 +65,11 @@ Licensed to the Apache Software Foundation (ASF) under one
          * @see ApplicationServiceMXBean
          */
         void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
    +    
    --- End diff --
    
    I'm solving a concrete use case, where the device has a number of pre-installed applications but not all are running. Remotely an application can be enabled, for example when a customer pays for a specific feature/service, the device provider turns on that application and presumably events from the device start flowing back to a central system to provide the service to the customer.
    
    If there are other use cases that need to be supported additional functionality can be added, maybe the discussion could move out of this pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58249473
  
    --- Diff: api/topology/src/main/java/quarks/topology/services/ApplicationService.java ---
    @@ -63,4 +65,11 @@ Licensed to the Apache Software Foundation (ASF) under one
          * @see ApplicationServiceMXBean
          */
         void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
    +    
    --- End diff --
    
    Ah, I see how a register-after-submit wouldn't work for that case.  I was basing my comment/question on the just the "monitoring" and "restart failed jobs" use case in the title, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58217230
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    --- End diff --
    
    > Topology.source() and that defines the stream is isolated
    Not sure what that means? How does it define it as isolated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58211233
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    +
    +        jobEvents = jobEvents.filter(
    +                value -> {
    +                    logger.trace("Filter: {}", value);
    +
    +                    try {
    +                        JsonObject job = MonitorAppEvent.getJob(value);
    +                        return (Job.Health.UNHEALTHY.name().equals(
    +                                MonitorAppEvent.getJobHealth(job)));
    +                    } catch (IllegalArgumentException e) {
    +                        logger.info("Invalid event filtered out, cause: {}", e.getMessage());
    +                        return false;
    +                    }
    +                 });
    +
    +        jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
    +        return t;
    +    }
    +
    +    /**
    +     * A {@code Consumer} which restarts the application specified by a 
    +     * JSON object passed to its {@code accept} function. 
    +     */
    +    private static class JobRestarter implements Consumer<JsonObject> {
    +        private static final long serialVersionUID = 1L;
    +        private final Supplier<RuntimeServices> rts;
    +
    +        JobRestarter(Supplier<RuntimeServices> rts) {
    +            this.rts = rts;
    +        }
    +
    +        @Override
    +        public void accept(JsonObject value) {
    +            ControlService controlService = rts.get().getService(ControlService.class);
    +            JsonObject job = MonitorAppEvent.getJob(value);
    +            String applicationName = MonitorAppEvent.getJobName(job);
    +
    +            logger.info("Will restart monitored application {}, cause: {}", applicationName, value);
    +            submitApplication(MonitorAppEvent.getJobName(job), controlService);
    --- End diff --
    
    replace 2nd call to `getJobName()` with `applicationName`?  Not an efficiency concern, just makes things clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58108175
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    --- End diff --
    
    Not sure I understand the purpose of this method. If code can get the ControlService it can get the application service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58229604
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    --- End diff --
    
    Ahh, thanks for the clarification.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58229460
  
    --- Diff: api/topology/src/main/java/quarks/topology/services/ApplicationService.java ---
    @@ -63,4 +65,11 @@ Licensed to the Apache Software Foundation (ASF) under one
          * @see ApplicationServiceMXBean
          */
         void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
    +    
    --- End diff --
    
    Above for `registerTopology()` what are the requirements for the appName?  What happens if one by that name is already registered?  By definition elsewhere, is a Job's appName already required to be unique? Regardless, seems like it could help to add doc here to clarify things.
    
    I'm also wondering about this "register with appName prior to submit" model vs say "register the *Job* following the submit".  A post submit registration scheme seems to enable leaving it to the system/provider-impl to decide what to use as an identifier to find the topology-builder to rebuild/resubmit the job.   It also feels more logical to express "I want this Job monitored" rather than "I want a/all jobs with this appName monitored"... though maybe that's just me.  Does the pre-submit scheme handle recovery from certain startup failures that the post-submit scheme can't?
    
    Is there also a need for an unregisterTopology() or is it just that an explicitly cancelled job is effectively automatically unregistered?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58107906
  
    --- Diff: api/execution/src/main/java/quarks/execution/services/ControlService.java ---
    @@ -91,4 +93,18 @@ Licensed to the Apache Software Foundation (ASF) under one
          * Unregister a control bean registered by {@link #registerControl(String, String, String, Class, Object)}
          */
         void unregister(String controlId);
    +    
    +    /**
    +     * Return the controls registered with this service which implement 
    +     * the specified interface.  The interface had previously been used to 
    +     * {@linkplain ControlService#registerControl(String, String, String, Class, Object) register}
    +     * the control.
    +     * 
    +     * @param controlInterface
    +     *              Public interface identifying the controls to be retrieved. 
    +     * @return a set containing the controls registered with the given 
    +     *              interface. If no control satisfies the query, an empty 
    +     *              set is returned.
    +     */
    +    <T> Set<T> queryInterfaces(Class<T> controlInterface);
    --- End diff --
    
    It's not a 'query', it's just a 'get' (read its description). Rename to 'getControls' or 'getControlMBeans'??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58245333
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    +
    +        jobEvents = jobEvents.filter(
    +                value -> {
    +                    logger.trace("Filter: {}", value);
    +
    +                    try {
    +                        JsonObject job = MonitorAppEvent.getJob(value);
    +                        return (Job.Health.UNHEALTHY.name().equals(
    +                                MonitorAppEvent.getJobHealth(job)));
    +                    } catch (IllegalArgumentException e) {
    +                        logger.info("Invalid event filtered out, cause: {}", e.getMessage());
    +                        return false;
    +                    }
    +                 });
    +
    +        jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
    +        return t;
    +    }
    +
    +    /**
    +     * A {@code Consumer} which restarts the application specified by a 
    +     * JSON object passed to its {@code accept} function. 
    +     */
    +    private static class JobRestarter implements Consumer<JsonObject> {
    +        private static final long serialVersionUID = 1L;
    +        private final Supplier<RuntimeServices> rts;
    +
    +        JobRestarter(Supplier<RuntimeServices> rts) {
    +            this.rts = rts;
    +        }
    +
    +        @Override
    +        public void accept(JsonObject value) {
    +            ControlService controlService = rts.get().getService(ControlService.class);
    +            JsonObject job = MonitorAppEvent.getJob(value);
    +            String applicationName = MonitorAppEvent.getJobName(job);
    +
    +            logger.info("Will restart monitored application {}, cause: {}", applicationName, value);
    +            submitApplication(MonitorAppEvent.getJobName(job), controlService);
    --- End diff --
    
    This and your next comment resolved in https://github.com/apache/incubator-quarks/pull/65


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58112992
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    --- End diff --
    
    I think this indicates there needs to be some standardization around applications, so they don't each have to implement their own submit method. Thinking that maybe there is an interface applications can implement to provide consistency, and we encourage jobs to start through the ApplicationServiceMXBean, and not by themselves.
    
    I'll put together some proposal, it's wouldn't block this being merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58210635
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    --- End diff --
    
    Isn't the isolate superfluous?  JobEvents.source() just wraps Topology.source() and that defines the stream is isolated.  Probably good to add the same doc to JobEvents.source().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-quarks/pull/59


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: [WIP] QUARKS-66 Job monitoring appl...

Posted by dlaboss <gi...@git.apache.org>.
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/59#discussion_r58225991
  
    --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
    @@ -0,0 +1,204 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.apps.runtime;
    +
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.gson.JsonObject;
    +
    +import quarks.execution.DirectSubmitter;
    +import quarks.execution.Job;
    +import quarks.execution.services.ControlService;
    +import quarks.execution.services.RuntimeServices;
    +import quarks.execution.services.job.JobRegistryService;
    +import quarks.function.Consumer;
    +import quarks.function.Supplier;
    +import quarks.runtime.jobregistry.JobEvents;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +import quarks.topology.TopologyProvider;
    +import quarks.topology.mbeans.ApplicationServiceMXBean;
    +import quarks.topology.plumbing.PlumbingStreams;
    +import quarks.topology.services.ApplicationService;
    +
    +/**
    + * Job monitoring application.
    + * <p>
    + * The application listens on JobRegistry events and resubmits jobs for which 
    + * an event has been emitted because the job is unhealthy. The monitored 
    + * applications must be registered with an {@code ApplicationService} 
    + * prior to submission, otherwise the monitor application cannot restart 
    + * them.</p>
    + * <p> 
    + * The monitoring application must be submitted within a context which 
    + * provides the following services:
    + * <ul>
    + * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
    + * registered by this service is used to resubmit failed applications.</li>
    + * <li>ControlService - the application queries this service for an 
    + * {@code ApplicationServiceMXBean} control, which is then used for 
    + * restarting failed applications.</li>
    + * <li>JobRegistryService - generates job monitoring events. </li>
    + * </ul>
    + * </p>
    + */
    +public class MonitorApp {
    +    private final TopologyProvider provider;
    +    private final DirectSubmitter<Topology, Job> submitter;
    +    private final Topology topology;
    +    private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
    +
    +    /**
    +     * Constructs a {@code MonitorApp} with the specified name in the 
    +     * context of the specified provider.
    +     * 
    +     * @param provider the topology provider
    +     * @param submitter a {@code DirectSubmitter} which provides required 
    +     *      services and submits the application
    +     * @param name the application name
    +     * 
    +     * @throws IllegalArgumentException if the submitter does not provide 
    +     *      access to the required services
    +     */
    +    public MonitorApp(TopologyProvider provider, 
    +            DirectSubmitter<Topology, Job> submitter, String name) {
    +
    +        this.provider = provider;
    +        this.submitter = submitter;
    +        validateSubmitter();
    +        this.topology = declareTopology(name);
    +    }
    +    
    +    /**
    +     * Submits the application topology.
    +     * 
    +     * @return the job.
    +     * @throws InterruptedException
    +     * @throws ExecutionException
    +     */
    +    public Job submit() throws InterruptedException, ExecutionException {
    +        Future<Job> f = submitter.submit(topology);
    +        return f.get();
    +    }
    +
    +    /**
    +     * Submits an application using an {@code ApplicationServiceMXBean} control 
    +     * registered with the specified {@code ControlService}.
    +     * 
    +     * @param applicationName the name of the application to submit
    +     * @param controlService the control service
    +     */
    +    public static void submitApplication(String applicationName, ControlService controlService) {
    +        try {
    +            Set<ApplicationServiceMXBean> controls = 
    +                    controlService.getControls(ApplicationServiceMXBean.class);
    +            if (controls.isEmpty()) {
    +                throw new IllegalStateException(
    +                        "Could not find a registered control with the following interface: " + 
    +                        ApplicationServiceMXBean.class.getName());                
    +            }
    +            for (ApplicationServiceMXBean control : controls)
    +// TODO add ability to submit with the initial application configuration
    +                control.submit(applicationName, null);
    +        }
    +        catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +    
    +    /**
    +     * Declares the following topology:
    +     * <pre>
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * </pre>
    +     * 
    +     * @param name the topology name
    +     * @return the application topology
    +     */
    +    protected Topology declareTopology(String name) {
    +        Topology t = provider.newTopology(name);
    +        TStream<JsonObject> jobEvents = JobEvents.source(
    +                t, 
    +                (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
    +                );
    +        jobEvents = PlumbingStreams.isolate(jobEvents, true);
    --- End diff --
    
    Sorry for the confusion.  I meant wraps `Topology.events()` and its doc defines the stream as isolated [here](https://github.com/apache/incubator-quarks/blob/master/api/topology/src/main/java/quarks/topology/Topology.java#L116)  And the impl GraphTopology.events() already returns a PlumbingStreams.isolate()'d stream :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---