You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:23:02 UTC

[sling-org-apache-sling-jobs-it-services] 01/07: SLING-5645 moved jobs out of examples

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jobs-it-services.git

commit 8df384053b49dd8672947bdc03de3a2d47e1fef8
Author: Ian Boston <ie...@apache.org>
AuthorDate: Mon Oct 3 16:05:02 2016 +0000

    SLING-5645 moved jobs out of examples
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1763181 13f79535-47bb-0310-9956-ffa450edef68
---
 pom.xml                                            | 117 +++++++++++++++++
 .../sling/jobs/it/services/AsyncJobConsumer.java   | 142 +++++++++++++++++++++
 .../sling/jobs/it/services/FullySyncJob.java       |  68 ++++++++++
 .../jobs/it/services/JobManagerTestComponent.java  |  72 +++++++++++
 testlaunch.jsp                                     |  44 +++++++
 5 files changed, 443 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f8dc191
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>26</version>
+        <relativePath />
+    </parent>
+
+    <artifactId>org.apache.sling.jobs-it-services</artifactId>
+    <packaging>bundle</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>Apache Sling Jobs Service Integration Tests Bundle</name>
+    <description>
+        Integration tests for the Jobs implementation
+    </description>
+
+    <scm>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</developerConnection>
+        <url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jobs/it-services</url>
+    </scm>
+
+    <properties>
+        <site.jira.version.id>12315369</site.jira.version.id>
+        <sling.java.version>7</sling.java.version>
+        <exam.version>4.4.0</exam.version>
+        <url.version>2.4.5</url.version>
+        <bundle.build.dir>${basedir}/target</bundle.build.dir>
+        <bundle.file.name>${bundle.build.dir}/${project.build.finalName}.jar</bundle.file.name>
+        <min.port>37000</min.port>
+        <max.port>37999</max.port>
+    </properties>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Export-Package></Export-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>
+                    </excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.jobs</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>2.0.1</version>
+        </dependency>
+      <!-- Testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java
new file mode 100644
index 0000000..99d22f1
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * This job consumer consumes jobs from the job subsystem. It accepts the jobs into a queue and uses a thread pool to drain the queue.
+ * If the queue fills up, jobs are returned back to the jobsystem without being accepted. The size of the queue, the number of threads and
+ * the maximum number of threads should be tuned for maximum throughput at an acceptable resource usage level. Retuning the consumer
+ * will cause the queue to drain and restart.
+ *
+ * The contract this component makes with the JobSystem is that it will make best efforts to ensure that jobs it accepts into its queue are executed.
+ *
+ */
+@Component(immediate = true)
+@Properties({
+        @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = {
+                AsyncJobConsumer.JOB_TYPE
+        })
+})
+@Service(value = JobConsumer.class)
+public class AsyncJobConsumer implements JobConsumer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncJobConsumer.class);
+
+
+    public static final String JOB_TYPE = "treadding/asyncthreadpoolwithbacklog";
+
+    /**
+     * The core number of threads that can be used to run this job. This should be just large enough to ensure
+     * throughput without being so large as to impact other operations. Probably 1/2 the number of cores is a good
+     * starting point.
+     */
+    @Property(intValue = 4)
+    private static final String CORE_THREAD_POOL_SIZE = "core-thread-pool-size";
+    /**
+     * The maximum number of threads allocated to running this job. This should not be so large that it can
+     * create availability issues for the JVM, but large enough to clear the backlog before it experiences
+     * inefficiency due to overflow.
+     */
+    @Property(intValue = 8)
+    private static final String MAC_THREAD_POOL_SIZE = "max-thread-pool-size";
+
+    /**
+     * This defines how many messages the component can queue for execution dequeing from the
+     * Job queue. This should be just large enough to ensure that the executing threads are kept busy
+     * but small enough to ensure that the shutdown is not blocked. Once into the queue there is some
+     * impression that the jobs will be executed as they have been dequeued from the message system.
+     * The deactivate will wait for the shutdown wait time, and then shut the queue down.
+     */
+    @Property(intValue = 8)
+    private static final String MAX_QUEUED_BACKLOG = "max-queue-backlog";
+
+    /**
+     * This is the maximum time allowed to shut the queue down. It should be long enough to ensure that all jobs in
+     * the local queue can complete. The longer the local queue set in max-queue-backlog, the higher this value must be.
+     */
+    @Property(longValue = 30)
+    private static final String SHUTDOWN_WAIT_SECONDS = "max-shutdown-wait";
+
+    private ExecutorService executor;
+    private LinkedBlockingQueue<Runnable> workQueue;
+    private long shutdownWaitSeconds;
+
+    @Activate
+    public void activate(Map<String, Object> properites) {
+        int corePoolSize = (int) properites.get(CORE_THREAD_POOL_SIZE);
+        int maxPoolSize = (int) properites.get(MAC_THREAD_POOL_SIZE);
+        int maxBacklog = (int) properites.get(MAX_QUEUED_BACKLOG);
+        shutdownWaitSeconds = (long) properites.get(SHUTDOWN_WAIT_SECONDS);
+        workQueue = new LinkedBlockingQueue<Runnable>(maxBacklog);
+        executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, workQueue);
+    }
+
+    @Deactivate
+    public void deactivate(Map<String, Object> properties) {
+        try {
+            executor.awaitTermination(shutdownWaitSeconds, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while waiting for queue to drain ",e);
+        }
+        executor.shutdown();
+    }
+
+    @Nonnull
+    @Override
+    public void execute(@Nonnull final Job initialState, @Nonnull final JobUpdateListener listener, @Nonnull final JobCallback callback) {
+        LOGGER.info("Got request to start job {} ", initialState);
+        initialState.setState(Job.JobState.QUEUED);
+        listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+        // if the Job cant be queued locally, a RejectedExecutionException will be thrown, back to the scheduler and the job message will be put back into the queue to be retried some time later.
+        executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                initialState.setState(Job.JobState.ACTIVE);
+                listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+                // DO some work here.
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+                listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build());
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+                initialState.setState(Job.JobState.SUCCEEDED);
+                listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build());
+                callback.callback(initialState);
+                return null;
+            }
+        });
+    }
+}
diff --git a/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java
new file mode 100644
index 0000000..0221ff6
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ */
+@Component(immediate = true)
+@Properties({
+        @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = {
+                FullySyncJob.JOB_TYPE
+        })
+})
+@Service(value = JobConsumer.class)
+public class FullySyncJob implements JobConsumer {
+
+
+    public static final String JOB_TYPE = "treadding/inthreadoperation";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FullySyncJob.class);
+
+    @Nonnull
+    @Override
+    public void execute(@Nonnull Job initialState, @Nonnull JobUpdateListener listener, @Nonnull JobCallback callback) {
+        LOGGER.info("Got request to start job {} ", initialState);
+        initialState.setState(Job.JobState.ACTIVE);
+        listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+
+        // DO some work here.
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+        listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build());
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            LOGGER.debug(e.getMessage(),e);
+        }
+        initialState.setState(Job.JobState.SUCCEEDED);
+        listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build());
+        callback.callback(initialState);
+    }
+}
diff --git a/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java
new file mode 100644
index 0000000..ffba8ac
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobManager;
+import org.apache.sling.jobs.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ */
+@Component(immediate = true)
+public class JobManagerTestComponent  {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerTestComponent.class);
+    public static final String TOPIC = "org/apache/sling/jobs/it/services";
+    @Reference
+    private JobManager jobManager;
+
+
+    @Activate
+    public void activate(Map<String,Object> props) {
+        for( int i = 0; i < 10; i++) {
+            Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(AsyncJobConsumer.JOB_TYPE)).addProperties(
+                    ImmutableMap.of("jobtest", (Object) "jobtest")).add();
+            assertNotNull(job);
+            LOGGER.info("Started Job {} ", job.getId());
+        }
+        // then start 10 sync jobs.
+        for( int i = 0; i < 10; i++) {
+            Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(FullySyncJob.JOB_TYPE)).addProperties(
+                    ImmutableMap.of("jobtest", (Object) "jobtest")).add();
+            assertNotNull(job);
+            LOGGER.info("Started Job {} ", job.getId());
+        }
+    }
+
+    @Deactivate
+    public void deactivate(Map<String, Object> props) {
+
+    }
+
+
+}
diff --git a/testlaunch.jsp b/testlaunch.jsp
new file mode 100644
index 0000000..6f11d2b
--- /dev/null
+++ b/testlaunch.jsp
@@ -0,0 +1,44 @@
+<%--
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+--%><%@include file="/libs/foundation/global.jsp"%><%
+%><%@page session="false" contentType="text/html; charset=utf-8"
+	pageEncoding="UTF-8"
+    import="org.apache.sling.api.resource.*,
+    java.util.*,
+    javax.jcr.*,
+    com.day.cq.search.*,
+    com.day.cq.wcm.api.*,
+    com.day.cq.dam.api.*,
+    org.apache.sling.jobs.*,
+    com.google.common.collect.*"%><%
+
+    // This is an AEM Fiddle that runs some jobs.
+
+    JobManager jobManager = sling.getService(JobManager.class);
+    for ( int i = 0; i < 100; i++ ) {
+        Job job = jobManager.newJobBuilder(
+                Types.jobQueue("org/apache/sling/jobs/it/services"),
+                Types.jobType("treadding/asyncthreadpoolwithbacklog"))
+            .addProperties(
+                        ImmutableMap.of("jobtest", (Object) "jobtest"))
+            .add();
+%>Added Job <%= job.getId() %><br/><%
+    }
+%>
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.