You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:23:02 UTC
[sling-org-apache-sling-jobs-it-services] 01/07: SLING-5645 moved
jobs out of examples
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jobs-it-services.git
commit 8df384053b49dd8672947bdc03de3a2d47e1fef8
Author: Ian Boston <ie...@apache.org>
AuthorDate: Mon Oct 3 16:05:02 2016 +0000
SLING-5645 moved jobs out of examples
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1763181 13f79535-47bb-0310-9956-ffa450edef68
---
pom.xml | 117 +++++++++++++++++
.../sling/jobs/it/services/AsyncJobConsumer.java | 142 +++++++++++++++++++++
.../sling/jobs/it/services/FullySyncJob.java | 68 ++++++++++
.../jobs/it/services/JobManagerTestComponent.java | 72 +++++++++++
testlaunch.jsp | 44 +++++++
5 files changed, 443 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f8dc191
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>sling</artifactId>
+ <version>26</version>
+ <relativePath />
+ </parent>
+
+ <artifactId>org.apache.sling.jobs-it-services</artifactId>
+ <packaging>bundle</packaging>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>Apache Sling Jobs Service Integration Tests Bundle</name>
+ <description>
+ Integration tests for the Jobs implementation
+ </description>
+
+ <scm>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jobs/it-services</developerConnection>
+ <url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jobs/it-services</url>
+ </scm>
+
+ <properties>
+ <site.jira.version.id>12315369</site.jira.version.id>
+ <sling.java.version>7</sling.java.version>
+ <exam.version>4.4.0</exam.version>
+ <url.version>2.4.5</url.version>
+ <bundle.build.dir>${basedir}/target</bundle.build.dir>
+ <bundle.file.name>${bundle.build.dir}/${project.build.finalName}.jar</bundle.file.name>
+ <min.port>37000</min.port>
+ <max.port>37999</max.port>
+ </properties>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Export-Package></Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>
+ </excludePackageNames>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jobs</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java
new file mode 100644
index 0000000..99d22f1
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/AsyncJobConsumer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * This job consumer consumes jobs from the job subsystem. It accepts the jobs into a queue and uses a thread pool to drain the queue.
+ * If the queue fills up, jobs are returned back to the jobsystem without being accepted. The size of the queue, the number of threads and
+ * the maximum number of threads should be tuned for maximum throughput at an acceptable resource usage level. Retuning the consumer
+ * will cause the queue to drain and restart.
+ *
+ * The contract this component makes with the JobSystem is that it will make best efforts to ensure that jobs it accepts into its queue are executed.
+ *
+ */
+@Component(immediate = true)
+@Properties({
+ @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = {
+ AsyncJobConsumer.JOB_TYPE
+ })
+})
+@Service(value = JobConsumer.class)
+public class AsyncJobConsumer implements JobConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncJobConsumer.class);
+
+
+ public static final String JOB_TYPE = "treadding/asyncthreadpoolwithbacklog";
+
+ /**
+ * The core number of threads that can be used to run this job. This should be just large enough to ensure
+ * throughput without being so large as to impact other operations. Probably 1/2 the number of cores is a good
+ * starting point.
+ */
+ @Property(intValue = 4)
+ private static final String CORE_THREAD_POOL_SIZE = "core-thread-pool-size";
+ /**
+ * The maximum number of threads allocated to running this job. This should not be so large that it can
+ * create availability issues for the JVM, but large enough to clear the backlog before it experiences
+ * inefficiency due to overflow.
+ */
+ @Property(intValue = 8)
+ private static final String MAC_THREAD_POOL_SIZE = "max-thread-pool-size";
+
+ /**
+ * This defines how many messages the component can queue for execution dequeing from the
+ * Job queue. This should be just large enough to ensure that the executing threads are kept busy
+ * but small enough to ensure that the shutdown is not blocked. Once into the queue there is some
+ * impression that the jobs will be executed as they have been dequeued from the message system.
+ * The deactivate will wait for the shutdown wait time, and then shut the queue down.
+ */
+ @Property(intValue = 8)
+ private static final String MAX_QUEUED_BACKLOG = "max-queue-backlog";
+
+ /**
+ * This is the maximum time allowed to shut the queue down. It should be long enough to ensure that all jobs in
+ * the local queue can complete. The longer the local queue set in max-queue-backlog, the higher this value must be.
+ */
+ @Property(longValue = 30)
+ private static final String SHUTDOWN_WAIT_SECONDS = "max-shutdown-wait";
+
+ private ExecutorService executor;
+ private LinkedBlockingQueue<Runnable> workQueue;
+ private long shutdownWaitSeconds;
+
+ @Activate
+ public void activate(Map<String, Object> properites) {
+ int corePoolSize = (int) properites.get(CORE_THREAD_POOL_SIZE);
+ int maxPoolSize = (int) properites.get(MAC_THREAD_POOL_SIZE);
+ int maxBacklog = (int) properites.get(MAX_QUEUED_BACKLOG);
+ shutdownWaitSeconds = (long) properites.get(SHUTDOWN_WAIT_SECONDS);
+ workQueue = new LinkedBlockingQueue<Runnable>(maxBacklog);
+ executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, workQueue);
+ }
+
+ @Deactivate
+ public void deactivate(Map<String, Object> properties) {
+ try {
+ executor.awaitTermination(shutdownWaitSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while waiting for queue to drain ",e);
+ }
+ executor.shutdown();
+ }
+
+ @Nonnull
+ @Override
+ public void execute(@Nonnull final Job initialState, @Nonnull final JobUpdateListener listener, @Nonnull final JobCallback callback) {
+ LOGGER.info("Got request to start job {} ", initialState);
+ initialState.setState(Job.JobState.QUEUED);
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+ // if the Job cant be queued locally, a RejectedExecutionException will be thrown, back to the scheduler and the job message will be put back into the queue to be retried some time later.
+ executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ initialState.setState(Job.JobState.ACTIVE);
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+ // DO some work here.
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.debug(e.getMessage(), e);
+ }
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.debug(e.getMessage(), e);
+ }
+ initialState.setState(Job.JobState.SUCCEEDED);
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build());
+ callback.callback(initialState);
+ return null;
+ }
+ });
+ }
+}
diff --git a/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java
new file mode 100644
index 0000000..0221ff6
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/FullySyncJob.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ */
+@Component(immediate = true)
+@Properties({
+ @Property(name = JobConsumer.JOB_TYPES, cardinality = Integer.MAX_VALUE, value = {
+ FullySyncJob.JOB_TYPE
+ })
+})
+@Service(value = JobConsumer.class)
+public class FullySyncJob implements JobConsumer {
+
+
+ public static final String JOB_TYPE = "treadding/inthreadoperation";
+ private static final Logger LOGGER = LoggerFactory.getLogger(FullySyncJob.class);
+
+ @Nonnull
+ @Override
+ public void execute(@Nonnull Job initialState, @Nonnull JobUpdateListener listener, @Nonnull JobCallback callback) {
+ LOGGER.info("Got request to start job {} ", initialState);
+ initialState.setState(Job.JobState.ACTIVE);
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step1").build());
+
+ // DO some work here.
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.debug(e.getMessage(), e);
+ }
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step2").build());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.debug(e.getMessage(),e);
+ }
+ initialState.setState(Job.JobState.SUCCEEDED);
+ listener.update(initialState.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.UPDATE_JOB).put("processing", "step3").build());
+ callback.callback(initialState);
+ }
+}
diff --git a/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java
new file mode 100644
index 0000000..ffba8ac
--- /dev/null
+++ b/src/main/java/org/apache/sling/jobs/it/services/JobManagerTestComponent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.it.services;
+
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobManager;
+import org.apache.sling.jobs.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ */
+@Component(immediate = true)
+public class JobManagerTestComponent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerTestComponent.class);
+ public static final String TOPIC = "org/apache/sling/jobs/it/services";
+ @Reference
+ private JobManager jobManager;
+
+
+ @Activate
+ public void activate(Map<String,Object> props) {
+ for( int i = 0; i < 10; i++) {
+ Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(AsyncJobConsumer.JOB_TYPE)).addProperties(
+ ImmutableMap.of("jobtest", (Object) "jobtest")).add();
+ assertNotNull(job);
+ LOGGER.info("Started Job {} ", job.getId());
+ }
+ // then start 10 sync jobs.
+ for( int i = 0; i < 10; i++) {
+ Job job = jobManager.newJobBuilder(Types.jobQueue(TOPIC), Types.jobType(FullySyncJob.JOB_TYPE)).addProperties(
+ ImmutableMap.of("jobtest", (Object) "jobtest")).add();
+ assertNotNull(job);
+ LOGGER.info("Started Job {} ", job.getId());
+ }
+ }
+
+ @Deactivate
+ public void deactivate(Map<String, Object> props) {
+
+ }
+
+
+}
diff --git a/testlaunch.jsp b/testlaunch.jsp
new file mode 100644
index 0000000..6f11d2b
--- /dev/null
+++ b/testlaunch.jsp
@@ -0,0 +1,44 @@
+<%--
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+--%><%@include file="/libs/foundation/global.jsp"%><%
+%><%@page session="false" contentType="text/html; charset=utf-8"
+ pageEncoding="UTF-8"
+ import="org.apache.sling.api.resource.*,
+ java.util.*,
+ javax.jcr.*,
+ com.day.cq.search.*,
+ com.day.cq.wcm.api.*,
+ com.day.cq.dam.api.*,
+ org.apache.sling.jobs.*,
+ com.google.common.collect.*"%><%
+
+ // This is an AEM Fiddle that runs some jobs.
+
+ JobManager jobManager = sling.getService(JobManager.class);
+ for ( int i = 0; i < 100; i++ ) {
+ Job job = jobManager.newJobBuilder(
+ Types.jobQueue("org/apache/sling/jobs/it/services"),
+ Types.jobType("treadding/asyncthreadpoolwithbacklog"))
+ .addProperties(
+ ImmutableMap.of("jobtest", (Object) "jobtest"))
+ .add();
+%>Added Job <%= job.getId() %><br/><%
+ }
+%>
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.