You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@edgent.apache.org by vdogaru <gi...@git.apache.org> on 2016/03/14 19:36:55 UTC

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

GitHub user vdogaru opened a pull request:

    https://github.com/apache/incubator-quarks/pull/9

    QUARKS-8 [WIP] Restart topology on uncaught exception

    [Job registry and sample monitoring](https://github.com/apache/incubator-quarks/commit/6587c5c4c66104d386585349d1e74439c8ca95cd) contains:
     * JobRegistryService which allows clients to register jobs, access registered jobs, and get notified when jobs are added / removed / updated.  Job event handlers are BiConsumer implementations which take an event type and the related Job instance as arguments
     * JobRegistry - JobRegistryService implementation used by the ETIAO runtime
     * Sample code demonstrating a system monitoring application which polls the registry for the current state of registered jobs.
    
    Does it sound like I am on the right track here? I'd appreciate some early feedback.  
    
    The following are still to be done:
     * Sample system monitoring application using events rather than polling.
     * System app which restarts closed jobs
     * Tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vdogaru/incubator-quarks QUARKS-8-vdogaru

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-quarks/pull/9.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9
    
----
commit 6587c5c4c66104d386585349d1e74439c8ca95cd
Author: Victor Dogaru <vd...@apache.org>
Date:   2016-03-14T18:11:12Z

    Job registry and sample monitoring app

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Job registry service

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56089774
  
    --- Diff: runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java ---
    @@ -51,6 +53,10 @@ public EtiaoJob(DirectGraph graph, String topologyName, ServiceContainer contain
             ControlService cs = container.getService(ControlService.class);
             if (cs != null)
                 cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
    +        
    +        this.jobs = (JobRegistry) container.getService(JobRegistryService.class);
    --- End diff --
    
    I considered the following alternatives:
       a) JobRegistry is an implementation class, then the runtime should register the registry, rather than the application. 
       b) JobRegistry has nothing specific to the etaio runtime, can be in its own module at the same level with JSONControl and JMXControl services. The JobRegistryService interface provides an add() method and is part of the same module.
    
    I think (b) has the advantage of modularity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56057266
  
    --- Diff: api/execution/src/main/java/quarks/execution/JobRegistryService.java ---
    @@ -0,0 +1,95 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.execution;
    +
    +import java.util.Set;
    +
    +import quarks.function.BiConsumer;
    +
    +/**
    + * Contains the methods necessary for the registration and removal
    --- End diff --
    
    Probably could simplify this first sentence as its the one-line used as the jaavdoc summary, maybe something as simple as "Job registration service". Additional details follow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Job registry service

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56067001
  
    --- Diff: samples/topology/src/main/java/quarks/samples/topology/JobPollingSample.java ---
    @@ -0,0 +1,130 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.samples.topology;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +import quarks.execution.Job;
    +import quarks.execution.JobRegistryService;
    +import quarks.providers.development.DevelopmentProvider;
    +import quarks.runtime.etiao.JobRegistry;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +
    +/**
    + * Job monitoring by polling job state. 
    + * <p>
    + * Demonstrates job monitoring using the {@link JobRegistryService} service.
    + * The example starts a system monitoring application then concurrently 
    + * submits two jobs.  The monitoring application is using a polling source
    + * to periodically scan the job registry and generates tuples containing the 
    + * current state of registered jobs. Tuples are pushed to a sink which prints
    + * them onto the system output.
    + */
    +public class JobPollingSample {
    +    private final DevelopmentProvider dtp;
    +    
    +    public static void main(String[] args) throws Exception {
    +        
    +        JobPollingSample app = new JobPollingSample();
    +
    +        // Start monitoring app
    +        app.monitor();
    +
    +        Thread.sleep(2000);
    +
    +        // Asynchronously start two jobs
    --- End diff --
    
    Right, I intend to replace this sample with a system application which uses an events source rather than polling to demonstrate the correct way for monitoring jobs.  Any inline sleeps a inserted to simulate a real case scenario, I'll properly document them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56062002
  
    --- Diff: runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java ---
    @@ -51,6 +53,10 @@ public EtiaoJob(DirectGraph graph, String topologyName, ServiceContainer contain
             ControlService cs = container.getService(ControlService.class);
             if (cs != null)
                 cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
    +        
    +        this.jobs = (JobRegistry) container.getService(JobRegistryService.class);
    --- End diff --
    
    Thanks for catching this.  My initial thinking was that a JobRegistryService provides restricted functionality to the application code: query, subscribe to job changes, and remove jobs, while **adding** jobs to the registry is the responsibility of the runtime, which would directly call the service implementation. A job is registered with a CONSTRUCTED state when a DirectTopology is created, then it changes to RUNNING when the topology is successfully submitted.  Allowing application code to register jobs might clash with the current implementation if applications add jobs already registered by the runtime.
    
    Currently the JobRegistryService docs indicate "registration" of Jobs as well, but there is no add() method, which is confusing. Which of the following is preferable?
    1. JobRegistryService restricted to job query / removal (I'll update the javadocs accordingly.)
    2. Add registration to JobRegistryService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56062476
  
    --- Diff: runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java ---
    @@ -51,6 +53,10 @@ public EtiaoJob(DirectGraph graph, String topologyName, ServiceContainer contain
             ControlService cs = container.getService(ControlService.class);
             if (cs != null)
                 cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
    +        
    +        this.jobs = (JobRegistry) container.getService(JobRegistryService.class);
    --- End diff --
    
    Maybe then it's the sole responsibility of the runtime to register a JobRegistry and thus JobRegistry would not be a public class, but instead an implementation detail
    
    Then it would not be expected that external code registers a JobRegistryService .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56056243
  
    --- Diff: api/execution/src/main/java/quarks/execution/JobRegistryService.java ---
    @@ -0,0 +1,95 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.execution;
    --- End diff --
    
    Other services are in `quarks.execution.services`, any reason for this being in a different package?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56062394
  
    --- Diff: api/execution/src/main/java/quarks/execution/JobRegistryService.java ---
    @@ -0,0 +1,95 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.execution;
    --- End diff --
    
    Thanks, I'll move to ```quarks.execution.services```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56054847
  
    --- Diff: runtime/etiao/src/main/java/quarks/runtime/etiao/EtiaoJob.java ---
    @@ -51,6 +53,10 @@ public EtiaoJob(DirectGraph graph, String topologyName, ServiceContainer contain
             ControlService cs = container.getService(ControlService.class);
             if (cs != null)
                 cs.registerControl(JobMXBean.TYPE, getId(), getName(), JobMXBean.class, new EtiaoJobBean(this));
    +        
    +        this.jobs = (JobRegistry) container.getService(JobRegistryService.class);
    --- End diff --
    
    Cannot assume the JobRegistryService is a JobRegistry which implies the add method should be on the JobRegistryService interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by vdogaru <gi...@git.apache.org>.
Github user vdogaru commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56058176
  
    --- Diff: samples/topology/src/main/java/quarks/samples/topology/JobPollingSample.java ---
    @@ -0,0 +1,130 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.samples.topology;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +import quarks.execution.Job;
    +import quarks.execution.JobRegistryService;
    +import quarks.providers.development.DevelopmentProvider;
    +import quarks.runtime.etiao.JobRegistry;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +
    +/**
    + * Job monitoring by polling job state. 
    + * <p>
    + * Demonstrates job monitoring using the {@link JobRegistryService} service.
    + * The example starts a system monitoring application then concurrently 
    + * submits two jobs.  The monitoring application is using a polling source
    + * to periodically scan the job registry and generates tuples containing the 
    + * current state of registered jobs. Tuples are pushed to a sink which prints
    + * them onto the system output.
    + */
    +public class JobPollingSample {
    +    private final DevelopmentProvider dtp;
    +    
    +    public static void main(String[] args) throws Exception {
    +        
    +        JobPollingSample app = new JobPollingSample();
    +
    +        // Start monitoring app
    +        app.monitor();
    +
    +        Thread.sleep(2000);
    +
    +        // Asynchronously start two jobs
    +        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    +        executor.schedule(app.getCallable("Monitored1"), 100, TimeUnit.MILLISECONDS);
    +        executor.schedule(app.getCallable("Monitored2"), 3300, TimeUnit.MILLISECONDS);
    +    }
    +
    +    JobPollingSample() throws Exception {
    +        this.dtp = new DevelopmentProvider();
    +        dtp.getServices().addService(JobRegistryService.class, new JobRegistry());
    +    }
    +
    +    void monitored(String name) throws InterruptedException, ExecutionException {
    +        Topology t = dtp.newTopology(name);
    +        
    +        Random r = new Random();
    +        TStream<Double> d  = t.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
    +        d.sink(tuple -> System.out.print("."));
    +
    +        Thread.sleep(2000);
    +        Future<Job> f = dtp.submit(t);
    +        Job job = f.get();
    +        Thread.sleep(5000);
    +        job.stateChange(Job.Action.CLOSE);
    +        Thread.sleep(2000);
    +
    +        provider().getServices().getService(JobRegistryService.class).removeJob(job.getId());
    +    }
    +    
    +    /**
    +     * Monitoring application polls the job registry every 1 sec.
    +     */
    +    void monitor() {
    +        Topology t = dtp.newTopology("Monitor");
    +
    +        TStream<Job.State[]> state = t.poll(() -> {
    +                JobRegistryService jobs = provider().getServices().getService(JobRegistryService.class);
    +                List<Job.State> states = new ArrayList<>();
    +                if (jobs != null) {
    +                    for (String id: jobs.getJobIds()) {
    +                        states.add(jobs.getJob(id).getCurrentState());
    +                    }
    +                }
    +                return states.toArray(new Job.State[0]);
    +            }, 1, TimeUnit.SECONDS);
    +
    +        state.sink(states -> {
    +                StringBuffer sb = new StringBuffer();
    +                for (Job.State s : states) {
    +                    sb.append(s).append(',');
    +                }
    +                System.out.println(sb.toString());
    +            });
    +        
    +        dtp.submit(t);
    +    }
    +    
    +    private DevelopmentProvider provider() {
    +        return dtp;
    +    }
    +    
    +    private Callable<Integer> getCallable(String name) {
    +        return new Callable<Integer>() {
    +
    +            @Override
    +            public Integer call() throws Exception {
    +                monitored(name);
    +                return new Integer(0);
    --- End diff --
    
    There is not really anything to return, so I'll replace Callable with Runnable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56059405
  
    --- Diff: samples/topology/src/main/java/quarks/samples/topology/JobPollingSample.java ---
    @@ -0,0 +1,130 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.samples.topology;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +import quarks.execution.Job;
    +import quarks.execution.JobRegistryService;
    +import quarks.providers.development.DevelopmentProvider;
    +import quarks.runtime.etiao.JobRegistry;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +
    +/**
    + * Job monitoring by polling job state. 
    + * <p>
    + * Demonstrates job monitoring using the {@link JobRegistryService} service.
    + * The example starts a system monitoring application then concurrently 
    + * submits two jobs.  The monitoring application is using a polling source
    + * to periodically scan the job registry and generates tuples containing the 
    + * current state of registered jobs. Tuples are pushed to a sink which prints
    + * them onto the system output.
    + */
    +public class JobPollingSample {
    +    private final DevelopmentProvider dtp;
    +    
    +    public static void main(String[] args) throws Exception {
    +        
    +        JobPollingSample app = new JobPollingSample();
    +
    +        // Start monitoring app
    +        app.monitor();
    +
    +        Thread.sleep(2000);
    +
    +        // Asynchronously start two jobs
    --- End diff --
    
    Maybe some more comments here as to why you are starting jobs this way.
    
    Is there any benefit to using a service here, wouldn't inline sleeps do the same thing?
    
    Just that people will copy samples into real apps and assume that sample code is the correct way implement things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Restart topology on ...

Posted by ddebrunner <gi...@git.apache.org>.
Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/9#discussion_r56055411
  
    --- Diff: samples/topology/src/main/java/quarks/samples/topology/JobPollingSample.java ---
    @@ -0,0 +1,130 @@
    +/*
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +*/
    +package quarks.samples.topology;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +import quarks.execution.Job;
    +import quarks.execution.JobRegistryService;
    +import quarks.providers.development.DevelopmentProvider;
    +import quarks.runtime.etiao.JobRegistry;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +
    +/**
    + * Job monitoring by polling job state. 
    + * <p>
    + * Demonstrates job monitoring using the {@link JobRegistryService} service.
    + * The example starts a system monitoring application then concurrently 
    + * submits two jobs.  The monitoring application is using a polling source
    + * to periodically scan the job registry and generates tuples containing the 
    + * current state of registered jobs. Tuples are pushed to a sink which prints
    + * them onto the system output.
    + */
    +public class JobPollingSample {
    +    private final DevelopmentProvider dtp;
    +    
    +    public static void main(String[] args) throws Exception {
    +        
    +        JobPollingSample app = new JobPollingSample();
    +
    +        // Start monitoring app
    +        app.monitor();
    +
    +        Thread.sleep(2000);
    +
    +        // Asynchronously start two jobs
    +        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    +        executor.schedule(app.getCallable("Monitored1"), 100, TimeUnit.MILLISECONDS);
    +        executor.schedule(app.getCallable("Monitored2"), 3300, TimeUnit.MILLISECONDS);
    +    }
    +
    +    JobPollingSample() throws Exception {
    +        this.dtp = new DevelopmentProvider();
    +        dtp.getServices().addService(JobRegistryService.class, new JobRegistry());
    +    }
    +
    +    void monitored(String name) throws InterruptedException, ExecutionException {
    +        Topology t = dtp.newTopology(name);
    +        
    +        Random r = new Random();
    +        TStream<Double> d  = t.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
    +        d.sink(tuple -> System.out.print("."));
    +
    +        Thread.sleep(2000);
    +        Future<Job> f = dtp.submit(t);
    +        Job job = f.get();
    +        Thread.sleep(5000);
    +        job.stateChange(Job.Action.CLOSE);
    +        Thread.sleep(2000);
    +
    +        provider().getServices().getService(JobRegistryService.class).removeJob(job.getId());
    +    }
    +    
    +    /**
    +     * Monitoring application polls the job registry every 1 sec.
    +     */
    +    void monitor() {
    +        Topology t = dtp.newTopology("Monitor");
    +
    +        TStream<Job.State[]> state = t.poll(() -> {
    +                JobRegistryService jobs = provider().getServices().getService(JobRegistryService.class);
    +                List<Job.State> states = new ArrayList<>();
    +                if (jobs != null) {
    +                    for (String id: jobs.getJobIds()) {
    +                        states.add(jobs.getJob(id).getCurrentState());
    +                    }
    +                }
    +                return states.toArray(new Job.State[0]);
    +            }, 1, TimeUnit.SECONDS);
    +
    +        state.sink(states -> {
    +                StringBuffer sb = new StringBuffer();
    +                for (Job.State s : states) {
    +                    sb.append(s).append(',');
    +                }
    +                System.out.println(sb.toString());
    +            });
    +        
    +        dtp.submit(t);
    +    }
    +    
    +    private DevelopmentProvider provider() {
    +        return dtp;
    +    }
    +    
    +    private Callable<Integer> getCallable(String name) {
    +        return new Callable<Integer>() {
    +
    +            @Override
    +            public Integer call() throws Exception {
    +                monitored(name);
    +                return new Integer(0);
    --- End diff --
    
    Just "return 0"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quarks pull request: QUARKS-8 [WIP] Job registry service

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-quarks/pull/9


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---