You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@polygene.apache.org by ni...@apache.org on 2015/11/14 05:37:23 UTC
[1/6] zest-java git commit: Adding more information in Exceptions
dealing with concurrent modifications.
Repository: zest-java
Updated Branches:
refs/heads/develop b9e71e7b4 -> d85a93fe8
Adding more information in Exceptions dealing with concurrent modifications.
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/80284fb2
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/80284fb2
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/80284fb2
Branch: refs/heads/develop
Commit: 80284fb254e2498ba8c7589314e904ebad305f5f
Parents: b9e71e7
Author: Niclas Hedhman <ni...@hedhman.org>
Authored: Sat Nov 14 08:49:06 2015 +0800
Committer: Niclas Hedhman <ni...@hedhman.org>
Committed: Sat Nov 14 08:49:06 2015 +0800
----------------------------------------------------------------------
.../ConcurrentEntityModificationException.java | 7 +++-
.../unitofwork/concern/UnitOfWorkConcern.java | 39 +++++++++++++++++---
.../zest/runtime/entity/EntityInstance.java | 15 +++++++-
.../runtime/entity/EntityStateInstance.java | 13 ++++++-
.../runtime/unitofwork/UnitOfWorkInstance.java | 2 +-
...currentEntityStateModificationException.java | 20 +++++++++-
.../ConcurrentModificationCheckConcern.java | 11 +++---
.../ConcurrentUoWFileModificationException.java | 5 ++-
.../uowfile/internal/UoWFileFactory.java | 2 +-
9 files changed, 94 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/api/src/main/java/org/apache/zest/api/unitofwork/ConcurrentEntityModificationException.java
----------------------------------------------------------------------
diff --git a/core/api/src/main/java/org/apache/zest/api/unitofwork/ConcurrentEntityModificationException.java b/core/api/src/main/java/org/apache/zest/api/unitofwork/ConcurrentEntityModificationException.java
index 5d02845..e494f82 100644
--- a/core/api/src/main/java/org/apache/zest/api/unitofwork/ConcurrentEntityModificationException.java
+++ b/core/api/src/main/java/org/apache/zest/api/unitofwork/ConcurrentEntityModificationException.java
@@ -15,6 +15,7 @@
package org.apache.zest.api.unitofwork;
import org.apache.zest.api.entity.EntityComposite;
+import org.apache.zest.api.usecase.Usecase;
/**
* This exception is thrown by UnitOfWork.complete() if any entities that are being committed
@@ -27,9 +28,11 @@ public class ConcurrentEntityModificationException
private final Iterable<EntityComposite> concurrentlyModifiedEntities;
- public ConcurrentEntityModificationException( Iterable<EntityComposite> concurrentlyModifiedEntities )
+ public ConcurrentEntityModificationException( Iterable<EntityComposite> concurrentlyModifiedEntities,
+ Usecase usecase
+ )
{
- super("Entities changed concurrently :" + concurrentlyModifiedEntities);
+ super( "Entities changed concurrently, and detected in usecase '" + usecase + "'\nModified entities : " + concurrentlyModifiedEntities );
this.concurrentlyModifiedEntities = concurrentlyModifiedEntities;
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/api/src/main/java/org/apache/zest/api/unitofwork/concern/UnitOfWorkConcern.java
----------------------------------------------------------------------
diff --git a/core/api/src/main/java/org/apache/zest/api/unitofwork/concern/UnitOfWorkConcern.java b/core/api/src/main/java/org/apache/zest/api/unitofwork/concern/UnitOfWorkConcern.java
index 5791398..c79db85 100644
--- a/core/api/src/main/java/org/apache/zest/api/unitofwork/concern/UnitOfWorkConcern.java
+++ b/core/api/src/main/java/org/apache/zest/api/unitofwork/concern/UnitOfWorkConcern.java
@@ -18,6 +18,7 @@
package org.apache.zest.api.unitofwork.concern;
import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
import org.apache.zest.api.common.AppliesTo;
import org.apache.zest.api.concern.GenericConcern;
import org.apache.zest.api.injection.scope.Invocation;
@@ -66,6 +67,7 @@ public class UnitOfWorkConcern
{
if( module.isUnitOfWorkActive() )
{
+ //noinspection ConstantConditions
return next.invoke( proxy, method, args );
}
else
@@ -86,6 +88,7 @@ public class UnitOfWorkConcern
Usecase usecase = usecase();
return invokeWithCommit( proxy, method, args, module.newUnitOfWork( usecase ) );
}
+ //noinspection ConstantConditions
return next.invoke( proxy, method, args );
}
@@ -122,22 +125,31 @@ public class UnitOfWorkConcern
int retry = 0;
while( true )
{
+ //noinspection ConstantConditions
Object result = next.invoke( proxy, method, args );
try
{
currentUnitOfWork.complete();
return result;
}
- catch( ConcurrentEntityModificationException e )
+ catch( UndeclaredThrowableException e)
{
- if( retry >= maxTries )
+ Throwable undeclared = e.getUndeclaredThrowable();
+ if( undeclared instanceof ConcurrentEntityModificationException )
+ {
+ ConcurrentEntityModificationException ceme = (ConcurrentEntityModificationException) undeclared;
+ currentUnitOfWork = checkRetry( maxTries, delayFactor, initialDelay, retry, ceme );
+ retry++;
+ }
+ else
{
throw e;
}
- module.currentUnitOfWork().discard();
- Thread.sleep( initialDelay + retry * delayFactor );
+ }
+ catch( ConcurrentEntityModificationException e )
+ {
+ currentUnitOfWork = checkRetry( maxTries, delayFactor, initialDelay, retry, e );
retry++;
- currentUnitOfWork = module.newUnitOfWork( usecase() );
}
}
}
@@ -149,6 +161,23 @@ public class UnitOfWorkConcern
}
}
+ private UnitOfWork checkRetry( int maxTries,
+ long delayFactor,
+ long initialDelay,
+ int retry,
+ ConcurrentEntityModificationException e
+ )
+ throws ConcurrentEntityModificationException, InterruptedException
+ {
+ if( retry >= maxTries )
+ {
+ throw e;
+ }
+ module.currentUnitOfWork().discard();
+ Thread.sleep( initialDelay + retry * delayFactor );
+ return module.newUnitOfWork( usecase() );
+ }
+
/**
* Discard unit of work if the discard policy match.
*
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityInstance.java
----------------------------------------------------------------------
diff --git a/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityInstance.java b/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityInstance.java
index d30ff3f..3f55ba5 100755
--- a/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityInstance.java
+++ b/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityInstance.java
@@ -231,7 +231,14 @@ public final class EntityInstance
@Override
public String toString()
{
- return identity.toString();
+ if( Boolean.getBoolean( "zest.entity.print.state" ) )
+ {
+ return state.toString();
+ }
+ else
+ {
+ return identity.toString();
+ }
}
public void remove( UnitOfWork unitOfWork )
@@ -301,7 +308,11 @@ public final class EntityInstance
catch( ConstraintViolationException e )
{
List<Class<?>> entityModelList = entityModel.types().collect( toList() );
- throw new ConstraintViolationException( identity.identity(), entityModelList, e.mixinTypeName(), e.methodName(), e.constraintViolations() );
+ throw new ConstraintViolationException( identity.identity(),
+ entityModelList,
+ e.mixinTypeName(),
+ e.methodName(),
+ e.constraintViolations() );
}
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityStateInstance.java
----------------------------------------------------------------------
diff --git a/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityStateInstance.java b/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityStateInstance.java
index 9ab2326..0028c5a 100644
--- a/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityStateInstance.java
+++ b/core/runtime/src/main/java/org/apache/zest/runtime/entity/EntityStateInstance.java
@@ -20,10 +20,12 @@
package org.apache.zest.runtime.entity;
import java.lang.reflect.AccessibleObject;
+import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.zest.api.association.Association;
import org.apache.zest.api.association.AssociationStateHolder;
@@ -39,7 +41,6 @@ import org.apache.zest.runtime.association.ManyAssociationInstance;
import org.apache.zest.runtime.association.ManyAssociationModel;
import org.apache.zest.runtime.association.NamedAssociationInstance;
import org.apache.zest.runtime.association.NamedAssociationModel;
-import org.apache.zest.runtime.composite.ConstraintsCheck;
import org.apache.zest.runtime.property.PropertyModel;
import org.apache.zest.runtime.unitofwork.BuilderEntityState;
import org.apache.zest.spi.entity.EntityState;
@@ -225,4 +226,14 @@ public final class EntityStateInstance
return state;
}
+
+ @Override
+ public String toString()
+ {
+ return "EntityState[" + state.entrySet().stream()
+ .map( entry -> ((Method) entry.getKey()).getName() + "=" + entry.getValue())
+ .collect( Collectors.joining("\n ", " ", "\n") )
+ + "]"
+ ;
+ }
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/runtime/src/main/java/org/apache/zest/runtime/unitofwork/UnitOfWorkInstance.java
----------------------------------------------------------------------
diff --git a/core/runtime/src/main/java/org/apache/zest/runtime/unitofwork/UnitOfWorkInstance.java b/core/runtime/src/main/java/org/apache/zest/runtime/unitofwork/UnitOfWorkInstance.java
index 6f404e7..e9ea72c 100755
--- a/core/runtime/src/main/java/org/apache/zest/runtime/unitofwork/UnitOfWorkInstance.java
+++ b/core/runtime/src/main/java/org/apache/zest/runtime/unitofwork/UnitOfWorkInstance.java
@@ -371,7 +371,7 @@ public final class UnitOfWorkInstance
.filter( instance -> instance.identity().equals( modifiedEntityIdentity ) )
.forEach( instance -> modifiedEntities.add( instance.<EntityComposite>proxy() ) );
}
- throw new ConcurrentEntityModificationException( modifiedEntities );
+ throw new ConcurrentEntityModificationException( modifiedEntities, ( (ConcurrentEntityStateModificationException) e ).getUsecase() );
}
else
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentEntityStateModificationException.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentEntityStateModificationException.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentEntityStateModificationException.java
index fe86c78..b6716ed 100644
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentEntityStateModificationException.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentEntityStateModificationException.java
@@ -18,6 +18,7 @@ package org.apache.zest.spi.entitystore;
import java.util.Collection;
import org.apache.zest.api.entity.EntityReference;
+import org.apache.zest.api.usecase.Usecase;
/**
* This exception should be thrown if the EntityStore detects that the entities being saved have been changed
@@ -27,10 +28,11 @@ public class ConcurrentEntityStateModificationException
extends EntityStoreException
{
private Collection<EntityReference> modifiedEntities;
+ private Usecase usecase;
public ConcurrentEntityStateModificationException( Collection<EntityReference> modifiedEntities )
{
- super("Entities changed concurrently:" + modifiedEntities);
+ super();
this.modifiedEntities = modifiedEntities;
}
@@ -38,4 +40,20 @@ public class ConcurrentEntityStateModificationException
{
return modifiedEntities;
}
+
+ @Override
+ public String getMessage()
+ {
+ return "Entities changed concurrently. Changes detected in usecase '" + usecase + "\nModified entities are;\n" + modifiedEntities;
+ }
+
+ public Usecase getUsecase()
+ {
+ return usecase;
+ }
+
+ public void setUsecase( Usecase usecase )
+ {
+ this.usecase = usecase;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentModificationCheckConcern.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentModificationCheckConcern.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentModificationCheckConcern.java
index 5ae7b56..5ed5f91 100755
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentModificationCheckConcern.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/ConcurrentModificationCheckConcern.java
@@ -14,8 +14,7 @@
package org.apache.zest.spi.entitystore;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.zest.api.ZestAPI;
import org.apache.zest.api.concern.ConcernOf;
@@ -59,10 +58,9 @@ public abstract class ConcurrentModificationCheckConcern
{
private final EntityStoreUnitOfWork uow;
private EntityStateVersions versions;
- private ModuleSpi module;
private long currentTime;
- private List<EntityState> loaded = new ArrayList<>();
+ private HashSet<EntityState> loaded = new HashSet<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -73,7 +71,6 @@ public abstract class ConcurrentModificationCheckConcern
{
this.uow = uow;
this.versions = versions;
- this.module = module;
this.currentTime = currentTime;
}
@@ -134,6 +131,10 @@ public abstract class ConcurrentModificationCheckConcern
catch( EntityStoreException e )
{
lock.writeLock().unlock();
+ if( e instanceof ConcurrentEntityStateModificationException )
+ {
+ ((ConcurrentEntityStateModificationException) e).setUsecase( usecase() );
+ }
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/ConcurrentUoWFileModificationException.java
----------------------------------------------------------------------
diff --git a/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/ConcurrentUoWFileModificationException.java b/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/ConcurrentUoWFileModificationException.java
index 0d8be4e..c1d8fe6 100644
--- a/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/ConcurrentUoWFileModificationException.java
+++ b/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/ConcurrentUoWFileModificationException.java
@@ -20,15 +20,16 @@ package org.apache.zest.library.uowfile.internal;
import java.util.Collections;
import org.apache.zest.api.entity.EntityComposite;
import org.apache.zest.api.unitofwork.ConcurrentEntityModificationException;
+import org.apache.zest.api.usecase.Usecase;
public class ConcurrentUoWFileModificationException
extends ConcurrentEntityModificationException
{
private final Iterable<UoWFile> concurrentlyModifiedFiles;
- ConcurrentUoWFileModificationException( Iterable<UoWFile> concurrentlyModifiedFiles )
+ ConcurrentUoWFileModificationException( Iterable<UoWFile> concurrentlyModifiedFiles, Usecase usecase )
{
- super( Collections.<EntityComposite>emptyList() );
+ super( Collections.<EntityComposite>emptyList(), usecase );
this.concurrentlyModifiedFiles = concurrentlyModifiedFiles;
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/80284fb2/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/UoWFileFactory.java
----------------------------------------------------------------------
diff --git a/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/UoWFileFactory.java b/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/UoWFileFactory.java
index f17e83a..3dcb80a 100644
--- a/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/UoWFileFactory.java
+++ b/libraries/uowfile/src/main/java/org/apache/zest/library/uowfile/internal/UoWFileFactory.java
@@ -155,7 +155,7 @@ public interface UoWFileFactory
}
if( !concurrentlyModified.isEmpty() )
{
- throw new ConcurrentUoWFileModificationException( concurrentlyModified );
+ throw new ConcurrentUoWFileModificationException( concurrentlyModified, uow.usecase() );
}
}
}
[6/6] zest-java git commit: ZEST-128 - Added overrun detection and
counting.
Posted by ni...@apache.org.
ZEST-128 - Added overrun detection and counting.
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/d85a93fe
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/d85a93fe
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/d85a93fe
Branch: refs/heads/develop
Commit: d85a93fe83f230ed12eccb40ca131d78fe05a198
Parents: 5d51f37
Author: Niclas Hedhman <ni...@hedhman.org>
Authored: Sat Nov 14 12:37:05 2015 +0800
Committer: Niclas Hedhman <ni...@hedhman.org>
Committed: Sat Nov 14 12:37:05 2015 +0800
----------------------------------------------------------------------
libraries/scheduler/src/docs/scheduler.txt | 51 ++++++++++++++-----
.../zest/library/scheduler/Execution.java | 1 -
.../scheduler/SchedulerConfiguration.java | 5 --
.../zest/library/scheduler/TaskRunner.java | 52 +++++++++++++-------
.../library/scheduler/schedule/Schedule.java | 7 ++-
.../zest/library/scheduler/SchedulerTest.java | 3 +-
.../scheduler/docsupport/SchedulerDocs.java | 44 ++++++++++++-----
7 files changed, 113 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/docs/scheduler.txt
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/docs/scheduler.txt b/libraries/scheduler/src/docs/scheduler.txt
index 2a42cbe..a5ab18d 100644
--- a/libraries/scheduler/src/docs/scheduler.txt
+++ b/libraries/scheduler/src/docs/scheduler.txt
@@ -25,7 +25,7 @@
source=libraries/scheduler/dev-status.xml
--------------
-The Scheduler library provides an easy way to schedule tasks using cron expressions if needed.
+The Scheduler library provides an easy way to schedule tasks either for one time execution, CRON expression intervals or a custom algorithm.
An optional Timeline allows you to browse past and future task runs.
@@ -39,7 +39,7 @@ The SLF4J Logger used by this library is named "org.apache.zest.library.schedule
Use SchedulerAssembler to add the Scheduler service to your Application. This
Assembler provide a fluent api to programmatically configure configuration defaults and activate the
-Timeline service assembly that allow to browse in past and future Task runs.
+Timeline service assembly that allow browsing of past and future Task runs.
Here is a full example:
@@ -71,12 +71,12 @@ source=libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.
tag=task
----
-Tasks have a mandatory name property and an optional tags property. Theses properties get copied in
+Tasks have a mandatory name property and an optional tags property. These properties get copied in
each TimelineRecord created when the Timeline feature is activated.
The run() method of Tasks is wrapped in a UnitOfWork when called by the Scheduler.
Thanks to the UnitOfWork handling in Zest, you can split the work done in your Tasks in
-several UnitOfWorks, the one around the Task#run() invocation will then be paused.
+several UnitOfWorks. See UnitOfWork strategy below.
Here is a simple example:
@@ -92,11 +92,11 @@ Tasks are scheduled using the Scheduler service. This creates a Schedule associa
the Task that allows you to know if it is running, to change it's cron expression and set it's
durability.
-By default, a Schedule is not durable. In other words, it do not survive an Application
-restart. To make a Schedule durable, set it's durable property to true once its scheduled.
+All Schedules are durable. In other words, it will survive an Application restart, and your application should
+not schedule it again, as the Schedules are when the SchedulerService is activated after bootstrap.
-There are two ways to schedule a Task using the Scheduler service: once or with a cron
-expression.
+There are three ways to schedule a Task using the Scheduler service: once or with a cron
+expression or providing your own Schedule instance.
=== Scheduling once ===
@@ -108,7 +108,8 @@ source=libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsu
tag=2
-----------
-Note that there is no point in making such a Schedule durable because it won't be run repeatedly.
+Since all Schedules are durable, this "once" can be far into the future, and still be executed if the application
+has been restarted.
=== Scheduling using a cron expression ===
@@ -137,11 +138,37 @@ To sum up, cron expressions used here have a precision of one second. The follow
- @annualy or @yearly
+== Overrun ==
+If the Schedule is running when it is time to be executed, then the execution will be skipped. This means that
+the Task must complete within its period, or executions will be skipped. The sideeffect of that is that this
+reduces thread exhaustion.
+
+When the Task execution is skipped, the overrun() property on the Schedule is incremented by 1.
+
== Durability ==
-Schedules can either be ethereal or durable, passed as an argument to the +Scheduler+. If it is a durable
-schedule, then the Task must be an Entity Composite.
+All Schedules are durable and the Task must be an Entity Composite. It also means that Tasks should be schedule
+once and not on each reboot. The SchedulerService will load all Schedules on activation.
+
+While the Task is running, the Schedule will be held in the UnitOfWork of the TaskRunner. This means that IF the
+Schedule is updated, i.e. cancelled or directly manipulating Schedule properties, the UnitOfWork.complete() will fail.
+And if the Task is executing within the same UnitOfWork, any changes made will not take place.
+
+== UnitOfWork strategy ==
+The TaskRunner creates a UnitOfWork and the Task is excuted within that UnitOfWork. This may be very convenient, but
+as noted in Durability above, that UnitOfWork will fail if Schedule properties are updated while the Task is
+running. To avoid that the Task's operations suffers from this, OR if the Task wants a Retry/DiscardOn strategy
+different from the default one, then the Task can simply declare its own. such as;
+
+[snippet,java]
+-----------
+source=libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
+tag=strategy
+-----------
+
+== Custom Schedules ==
+It is possible to implement Schedule directly. It must be declared as an EntityComposite in the assembly, and be
+visible from the SchedulerService. No other considerations should be necessary.
-When the
== Observing the Timeline ==
Timeline allow to browse in past and future Task runs. This feature is available only if you activate
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
index 8aecbe5..3bc120c 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
@@ -218,7 +218,6 @@ public interface Execution
public void stop()
throws Exception
{
-
running = false;
synchronized( this )
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
index 0ebc81d..66d7769 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
@@ -42,10 +42,5 @@ public interface SchedulerConfiguration
@Optional @UseDefaults
Property<Integer> workQueueSize();
- /**
- * @return If the scheduler must stop without waiting for running tasks, optional and defaults to false.
- */
- @UseDefaults
- Property<Boolean> stopViolently();
// END SNIPPET: configuration
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
index f506129..9f5d772 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
@@ -25,7 +25,7 @@ import org.apache.zest.api.injection.scope.Structure;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.structure.Module;
import org.apache.zest.api.unitofwork.UnitOfWork;
-import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.api.usecase.UsecaseBuilder;
import org.apache.zest.library.scheduler.schedule.Schedule;
import org.apache.zest.library.scheduler.schedule.ScheduleTime;
@@ -39,31 +39,38 @@ public class TaskRunner
private ScheduleTime schedule;
@Override
- @UnitOfWorkPropagation( usecase = "Task Runner" )
public void run()
{
+ // TODO: (niclas) I am NOT happy with this implementation, requiring 3 UnitOfWorks to be created. 15-20 milliseconds on my MacBook. If there is a better way to detect overrun, two of those might not be needed.
try
{
- UnitOfWork uow = module.currentUnitOfWork();
+ UnitOfWork uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner initialize" ) );
Schedule schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity() );
- Task task = schedule.task().get();
- try
+ if( !schedule.running().get() ) // check for overrun.
{
- schedule.taskStarting();
- task.run();
- schedule.taskCompletedSuccessfully();
- }
- catch( RuntimeException ex )
- {
- Throwable exception = ex;
- while(exception instanceof UndeclaredThrowableException)
+ try
+ {
+ schedule.taskStarting();
+ uow.complete(); // This completion is needed to detect overrun
+ uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner" ) );
+ schedule = uow.get( schedule ); // re-attach the entity to the new UnitOfWork
+ Task task = schedule.task().get();
+ task.run();
+ uow.complete(); // Need this to avoid ConcurrentModificationException when there has been an overrun.
+ uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner conclude" ) );
+ schedule = uow.get( schedule ); // re-attach the entity to the new UnitOfWork
+ schedule.taskCompletedSuccessfully();
+ }
+ catch( RuntimeException ex )
{
- exception = ((UndeclaredThrowableException) ex).getUndeclaredThrowable();
+ processException( schedule, ex );
}
- schedule.taskCompletedWithException( exception );
- schedule.exceptionCounter().set( schedule.exceptionCounter().get() + 1 );
+ schedule.executionCounter().set( schedule.executionCounter().get() + 1 );
+ }
+ else
+ {
+ schedule.overrun().set( schedule.overrun().get() + 1 );
}
- schedule.executionCounter().set( schedule.executionCounter().get() + 1 );
uow.complete();
}
catch( Exception e )
@@ -71,4 +78,15 @@ public class TaskRunner
throw new UndeclaredThrowableException( e );
}
}
+
+ private void processException( Schedule schedule, RuntimeException ex )
+ {
+ Throwable exception = ex;
+ while( exception instanceof UndeclaredThrowableException )
+ {
+ exception = ( (UndeclaredThrowableException) ex ).getUndeclaredThrowable();
+ }
+ schedule.taskCompletedWithException( exception );
+ schedule.exceptionCounter().set( schedule.exceptionCounter().get() + 1 );
+ }
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
index 7e9555f..48f2e6f 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
@@ -86,6 +86,12 @@ public interface Schedule extends EntityComposite
@UseDefaults
Property<Boolean> done();
+ /** Returns the number of times the Schedule has been skipped, due to the Task was still running.
+ *
+ * @return the number of times the Schedule has been skipped, due to the Task was still running.
+ */
+ @UseDefaults
+ Property<Long> overrun();
/**
* Called just before the {@link org.apache.zest.library.scheduler.Task#run()} method is called.
@@ -121,5 +127,4 @@ public interface Schedule extends EntityComposite
* @return A String representing this schedule.
*/
String presentationString();
-
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
index 7bc8aa5..5c0a226 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
@@ -166,9 +166,8 @@ public class SchedulerTest
uow.complete();
}
- Thread.sleep(5000);
await( usecase.name() )
- .atMost( 30, SECONDS )
+ .atMost( 6, SECONDS )
.until( taskOutput( taskIdentity ), equalTo( 4 ) );
try( UnitOfWork uow = module.newUnitOfWork( usecase ) )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/d85a93fe/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
index 6de65b7..9a78638 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
@@ -22,21 +22,25 @@ import org.apache.zest.api.association.Association;
import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.property.Property;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkDiscardOn;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkRetry;
import org.apache.zest.library.scheduler.Scheduler;
import org.apache.zest.library.scheduler.Task;
import org.apache.zest.library.scheduler.schedule.Schedule;
import org.apache.zest.library.scheduler.timeline.Timeline;
-
public class SchedulerDocs
{
-// START SNIPPET: timeline
- @Service Timeline timeline;
+ // START SNIPPET: timeline
+ @Service
+ Timeline timeline;
// END SNIPPET: timeline
-// START SNIPPET: 2
- @Service Scheduler scheduler;
+ // START SNIPPET: 2
+ @Service
+ Scheduler scheduler;
public void method()
{
@@ -45,12 +49,13 @@ public class SchedulerDocs
// myTask will be run in 10 seconds from now
}
-// END SNIPPET: 2
- MyTaskEntity todo() {
+ // END SNIPPET: 2
+ MyTaskEntity todo()
+ {
return null;
}
-// START SNIPPET: 1
+ // START SNIPPET: 1
interface MyTaskEntity extends Task
{
Property<String> myTaskState();
@@ -60,19 +65,34 @@ public class SchedulerDocs
class MyTaskMixin implements Runnable
{
- @This MyTaskEntity me;
+ @This
+ MyTaskEntity me;
@Override
public void run()
{
- me.myTaskState().set(me.anotherEntity().get().doSomeStuff(me.myTaskState().get()));
+ me.myTaskState().set( me.anotherEntity().get().doSomeStuff( me.myTaskState().get() ) );
}
}
-// END SNIPPET: 1
+ // END SNIPPET: 1
interface AnotherEntity
{
- String doSomeStuff(String p);
+ String doSomeStuff( String p );
}
+ public class MyTask implements Runnable
+ {
+
+ // START SNIPPET: strategy
+ @Override
+ @UnitOfWorkRetry( retries = 3 )
+ @UnitOfWorkDiscardOn( IllegalArgumentException.class )
+ @UnitOfWorkPropagation( value = UnitOfWorkPropagation.Propagation.REQUIRES_NEW, usecase = "Load Data" )
+ public void run()
+ {
+ // END SNIPPET: strategy
+
+ }
+ }
}
\ No newline at end of file
[4/6] zest-java git commit: ZEST-128 - Added unwinding of
UndeclaredThrowableException in TaskRunner,
to allow checked exceptions wrapping in Tasks.
Posted by ni...@apache.org.
ZEST-128 - Added unwinding of UndeclaredThrowableException in TaskRunner, to allow checked exceptions wrapping in Tasks.
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/8214c84e
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/8214c84e
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/8214c84e
Branch: refs/heads/develop
Commit: 8214c84ec7f8e5481abfc615adea0588b5503d4e
Parents: 02491a3
Author: Niclas Hedhman <ni...@hedhman.org>
Authored: Sat Nov 14 11:15:19 2015 +0800
Committer: Niclas Hedhman <ni...@hedhman.org>
Committed: Sat Nov 14 11:15:19 2015 +0800
----------------------------------------------------------------------
.../org/apache/zest/library/scheduler/TaskRunner.java | 7 ++++++-
.../apache/zest/library/scheduler/schedule/Schedule.java | 10 +++++-----
.../library/scheduler/schedule/cron/CronSchedule.java | 2 +-
.../library/scheduler/schedule/once/OnceSchedule.java | 2 +-
.../scheduler/timeline/TimelineForScheduleConcern.java | 4 ++--
5 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8214c84e/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
index 8beacee..f506129 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
@@ -55,7 +55,12 @@ public class TaskRunner
}
catch( RuntimeException ex )
{
- schedule.taskCompletedWithException( ex );
+ Throwable exception = ex;
+ while(exception instanceof UndeclaredThrowableException)
+ {
+ exception = ((UndeclaredThrowableException) ex).getUndeclaredThrowable();
+ }
+ schedule.taskCompletedWithException( exception );
schedule.exceptionCounter().set( schedule.exceptionCounter().get() + 1 );
}
schedule.executionCounter().set( schedule.executionCounter().get() + 1 );
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8214c84e/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
index d2da51b..7e9555f 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
@@ -18,14 +18,13 @@
*/
package org.apache.zest.library.scheduler.schedule;
+import org.apache.zest.api.association.Association;
import org.apache.zest.api.common.UseDefaults;
import org.apache.zest.api.entity.EntityComposite;
-import org.joda.time.DateTime;
-import org.apache.zest.api.association.Association;
-import org.apache.zest.api.entity.Identity;
import org.apache.zest.api.property.Immutable;
import org.apache.zest.api.property.Property;
import org.apache.zest.library.scheduler.Task;
+import org.joda.time.DateTime;
/**
* Represent the scheduling of a {@link Task}.
@@ -102,9 +101,10 @@ public interface Schedule extends EntityComposite
/**
* Called directly after the {@link org.apache.zest.library.scheduler.Task#run()} method has been completed but
* threw a RuntimeException.
- * @param ex
+ * @param ex The execption that was thrown in the Task. If the thrown Exception was an
+ * {@link java.lang.reflect.UndeclaredThrowableException} then the underlying exception is passed here.
*/
- void taskCompletedWithException( RuntimeException ex );
+ void taskCompletedWithException( Throwable ex );
/**
* Compute the next time this schedule is to be run.
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8214c84e/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
index d137cb3..9036592 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
@@ -56,7 +56,7 @@ public interface CronSchedule
}
@Override
- public void taskCompletedWithException( RuntimeException ex )
+ public void taskCompletedWithException( Throwable ex )
{
running().set(false);
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8214c84e/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
index 66fdb21..ca31cf4 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
@@ -39,7 +39,7 @@ public interface OnceSchedule
}
@Override
- public void taskCompletedWithException( RuntimeException ex )
+ public void taskCompletedWithException( Throwable ex )
{
running().set( false );
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8214c84e/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineForScheduleConcern.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineForScheduleConcern.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineForScheduleConcern.java
index ff67025..6bb5c3e 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineForScheduleConcern.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineForScheduleConcern.java
@@ -53,7 +53,7 @@ public abstract class TimelineForScheduleConcern
}
@Override
- public void taskCompletedWithException( RuntimeException ex )
+ public void taskCompletedWithException( Throwable ex )
{
TimelineRecordStep step = TimelineRecordStep.FAILURE;
String details = "Exception occurred:" + getStackTrace( ex );
@@ -78,7 +78,7 @@ public abstract class TimelineForScheduleConcern
state.history().set( timelineRecords );
}
- private String getStackTrace( RuntimeException ex )
+ private String getStackTrace( Throwable ex )
{
ByteArrayOutputStream baos = new ByteArrayOutputStream( 1000 );
BufferedOutputStream out = new BufferedOutputStream( baos );
[3/6] zest-java git commit: ZEST-128 - Fixed up Scheduler library so
that schedules are not lost. Tried to introduce better separation of
concerns. Added counters for Execution and Exceptions.
Posted by ni...@apache.org.
ZEST-128 - Fixed up Scheduler library so that schedules are not lost. Tried to introduce better separation of concerns. Added counters for Execution and Exceptions.
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/02491a34
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/02491a34
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/02491a34
Branch: refs/heads/develop
Commit: 02491a34e8ee1a118e358f5ed4f667b4d70eae82
Parents: 80284fb
Author: Niclas Hedhman <ni...@hedhman.org>
Authored: Sat Nov 14 11:09:38 2015 +0800
Committer: Niclas Hedhman <ni...@hedhman.org>
Committed: Sat Nov 14 11:09:38 2015 +0800
----------------------------------------------------------------------
.../DefaultEntityStoreUnitOfWork.java | 1 -
.../ContextResourceClientFactoryTest.java | 4 +-
.../restlet/identity/IdentityManager.java | 4 +-
.../zest/library/scheduler/Execution.java | 240 ++++++++++++++
.../zest/library/scheduler/Scheduler.java | 53 +++-
.../scheduler/SchedulerConfiguration.java | 4 +-
.../zest/library/scheduler/SchedulerMixin.java | 318 ++++---------------
.../library/scheduler/SchedulerService.java | 53 +---
.../library/scheduler/SchedulesHandler.java | 89 ++++++
.../org/apache/zest/library/scheduler/Task.java | 23 +-
.../zest/library/scheduler/TaskRunner.java | 69 ++++
.../scheduler/bootstrap/SchedulerAssembler.java | 14 +-
.../defaults/DefaultRejectionHandler.java | 39 +++
.../defaults/DefaultScheduleFactoryMixin.java | 91 ++++++
.../defaults/DefaultThreadFactory.java | 56 ++++
.../library/scheduler/schedule/Schedule.java | 55 +++-
.../scheduler/schedule/ScheduleFactory.java | 115 +------
.../scheduler/schedule/ScheduleTime.java | 21 +-
.../scheduler/schedule/cron/CronSchedule.java | 22 +-
.../scheduler/schedule/once/OnceSchedule.java | 22 +-
.../library/scheduler/timeline/Timeline.java | 4 +-
.../timeline/TimelineScheduleMixin.java | 5 +-
.../timeline/TimelineSchedulerServiceMixin.java | 30 +-
.../scheduler/AbstractSchedulerTest.java | 22 +-
.../apache/zest/library/scheduler/FooTask.java | 19 +-
.../zest/library/scheduler/SchedulerTest.java | 87 +++--
.../scheduler/docsupport/SchedulerDocs.java | 2 +-
27 files changed, 938 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java
----------------------------------------------------------------------
diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java
index bb873d1..f6e48f1 100755
--- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java
+++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java
@@ -82,7 +82,6 @@ public final class DefaultEntityStoreUnitOfWork
public EntityState entityStateOf( ModuleSpi module, EntityReference anIdentity )
throws EntityNotFoundException
{
-
EntityState entityState = states.get( anIdentity );
if( entityState != null )
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java b/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java
index 0efbe9d..47ff667 100644
--- a/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java
+++ b/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java
@@ -21,6 +21,7 @@ package org.apache.zest.library.rest.client;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import org.apache.zest.api.usecase.UsecaseBuilder;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
@@ -597,7 +598,8 @@ public class ContextResourceClientFactoryTest
public void beforeCompletion()
throws UnitOfWorkCompletionException
{
- throw new ConcurrentEntityModificationException( Collections.<EntityComposite>emptyList() );
+ throw new ConcurrentEntityModificationException( Collections.<EntityComposite>emptyList(),
+ UsecaseBuilder.newUsecase( "Testing" ) );
}
public void afterCompletion( UnitOfWorkStatus status )
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java
----------------------------------------------------------------------
diff --git a/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java b/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java
index 5286413..0f659f9 100644
--- a/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java
+++ b/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java
@@ -40,7 +40,7 @@ import static org.apache.zest.functional.Iterables.first;
@Concerns( { UnitOfWorkConcern.class } )
public interface IdentityManager
{
- char SEPARATOR = '~';
+ String SEPARATOR = "~";
String IDENTITY_SIGNATURE = "[0-9][0-9]*~.*";
boolean isIdentity( String candidate );
@@ -123,7 +123,7 @@ public interface IdentityManager
@Override
public Class extractType( String identity )
{
- if( isIdentity( identity ) )
+ if( !isIdentity( identity ) )
{
throw new IllegalArgumentException( "Given argument '" + identity + "' is not an Identity" );
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
new file mode 100644
index 0000000..dbb3b72
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.zest.api.concern.Concerns;
+import org.apache.zest.api.configuration.Configuration;
+import org.apache.zest.api.injection.scope.Structure;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.mixin.Mixins;
+import org.apache.zest.api.structure.Module;
+import org.apache.zest.api.unitofwork.NoSuchEntityException;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkRetry;
+import org.apache.zest.library.scheduler.schedule.Schedule;
+import org.apache.zest.library.scheduler.schedule.ScheduleTime;
+
+@Mixins( Execution.ExecutionMixin.class )
+@Concerns( UnitOfWorkConcern.class )
+public interface Execution
+{
+
+ void dispatchForExecution( Schedule schedule );
+
+ void start()
+ throws Exception;
+
+ void stop()
+ throws Exception;
+
+ @UnitOfWorkPropagation
+ @UnitOfWorkRetry( retries = 3 )
+ void updateNextTime( ScheduleTime schedule );
+
+ class ExecutionMixin
+ implements Execution, Runnable
+ {
+ private static final ThreadGroup TG = new ThreadGroup( "Zest Scheduling" );
+
+ @Structure
+ private Module module;
+
+ @This
+ private Scheduler scheduler;
+
+ @This
+ private Configuration<SchedulerConfiguration> config;
+
+ @This
+ private ThreadFactory threadFactory;
+
+ @This
+ private RejectedExecutionHandler rejectionHandler;
+
+ private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>();
+ private volatile boolean running;
+ private ThreadPoolExecutor taskExecutor;
+ private Thread scheduleThread;
+
+ @Override
+ @UnitOfWorkPropagation
+ public void run()
+ {
+ synchronized( this )
+ {
+ running = true;
+ while( running )
+ {
+ try
+ {
+ if( timingQueue.size() > 0 )
+ {
+ ScheduleTime scheduleTime = timingQueue.first();
+ waitFor( scheduleTime );
+ timingQueue.remove( scheduleTime );
+ updateNextTime( scheduleTime );
+ submitTaskForExecution( scheduleTime );
+ }
+ else
+ {
+ this.wait( 100 );
+ }
+ }
+ catch( InterruptedException e )
+ {
+ // Ignore. Used to signal "Hey, wake up. Time to work..."
+ System.out.println("Interrupted");
+ }
+ }
+ }
+ }
+
+ private void waitFor( ScheduleTime scheduleTime )
+ throws InterruptedException
+ {
+ long now = System.currentTimeMillis();
+ long waitingTime = scheduleTime.nextTime() - now;
+ if( waitingTime > 0 )
+ {
+ this.wait( waitingTime );
+ }
+ }
+
+ @Override
+ public void updateNextTime( ScheduleTime scheduleTime )
+ {
+ long now = System.currentTimeMillis();
+
+ try (UnitOfWork uow = module.newUnitOfWork())
+ {
+ try
+ {
+ Schedule schedule = uow.get( Schedule.class, scheduleTime.scheduleIdentity() );
+ long nextTime = schedule.nextRun( now );
+ if( nextTime != Long.MIN_VALUE )
+ {
+ scheduleTime = new ScheduleTime( schedule.identity().get(), nextTime );
+ timingQueue.add( scheduleTime );
+ }
+ }
+ catch( NoSuchEntityException e )
+ {
+ // Schedule has been removed.
+ scheduler.cancelSchedule( scheduleTime.scheduleIdentity() );
+ }
+ uow.complete();
+ }
+ catch( UnitOfWorkCompletionException e )
+ {
+ throw new UndeclaredThrowableException( e );
+ }
+ }
+
+ private void submitTaskForExecution( ScheduleTime scheduleTime )
+ {
+ Runnable taskRunner = module.newTransient( Runnable.class, scheduleTime );
+ this.taskExecutor.submit( taskRunner );
+ }
+
+ public void dispatchForExecution( Schedule schedule )
+ {
+ long now = System.currentTimeMillis();
+ synchronized( this )
+ {
+ long nextRun = schedule.nextRun( now );
+ if( nextRun > 0 )
+ {
+ timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) );
+ scheduleThread.interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void start()
+ throws Exception
+ {
+ SchedulerConfiguration configuration = config.get();
+ Integer workersCount = configuration.workersCount().get();
+ Integer workQueueSize = configuration.workQueueSize().get();
+ createThreadPoolExecutor( workersCount, workQueueSize );
+ taskExecutor.prestartAllCoreThreads();
+
+ scheduleThread = new Thread( TG, this, "Scheduler" );
+ scheduleThread.start();
+ }
+
+ private void createThreadPoolExecutor( Integer workersCount, Integer workQueueSize )
+ {
+ int corePoolSize = 2;
+ if( workersCount > 4 )
+ {
+ corePoolSize = workersCount / 4 + 1;
+ }
+ if( corePoolSize > 50 )
+ {
+ corePoolSize = 20;
+ }
+ if( workersCount > 200 )
+ {
+ workersCount = 200;
+ }
+ taskExecutor = new ThreadPoolExecutor( corePoolSize, workersCount,
+ 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>( workQueueSize ),
+ threadFactory, rejectionHandler );
+ }
+
+ @Override
+ public void stop()
+ throws Exception
+ {
+
+ running = false;
+ synchronized( this )
+ {
+ scheduleThread.interrupt();
+ }
+ taskExecutor.shutdown();
+ try
+ {
+ taskExecutor.awaitTermination( 5, TimeUnit.SECONDS );
+ }
+ catch( InterruptedException e )
+ {
+ e.printStackTrace();
+ }
+ taskExecutor.shutdownNow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java
index 2752461..f8aae19 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java
@@ -45,10 +45,12 @@ import static org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propa
* By default, a {@link Schedule} is not durable. In other words, it do not survive an {@link Application} restart.
* </p>
* <p>
- * To make a {@link Schedule} durable, set it's durable property to true once its scheduled.
+ * All {@link Schedule}s are durable and stored in the visible {@link org.apache.zest.spi.entitystore.EntityStore} like
+ * any ordinary {@link org.apache.zest.api.entity.EntityComposite}. There is also a {@link org.apache.zest.library.scheduler.schedule.Schedules}
+ * entity composite that has Associations to all active, completed and cancelled schedules.
* </p>
* <p>
- * Durable {@link Schedule}s that have no future run are removed by {@code SchedulerGarbageCollector} (not implemented?).
+ *
* </p>
*/
@Concerns( UnitOfWorkConcern.class )
@@ -59,36 +61,33 @@ public interface Scheduler
*
* @param task Task to be scheduled once
* @param initialSecondsDelay Initial delay the Task will be run after, in seconds
- * @param durable true if this Schedule should survive a restart.
*
* @return The newly created Schedule
*/
@UnitOfWorkPropagation( MANDATORY )
- Schedule scheduleOnce( Task task, int initialSecondsDelay, boolean durable );
+ Schedule scheduleOnce( Task task, int initialSecondsDelay );
/**
* Schedule a Task to be run after a given initial delay in seconds.
*
- * @param task Task to be scheduled once
- * @param runAt The future point in time when the Schedule will be run.
- * @param durable true if this Schedule should survive a restart.
+ * @param task Task to be scheduled once
+ * @param runAt The future point in time when the Schedule will be run.
*
* @return The newly created Schedule
*/
@UnitOfWorkPropagation( MANDATORY )
- Schedule scheduleOnce( Task task, DateTime runAt, boolean durable );
+ Schedule scheduleOnce( Task task, DateTime runAt );
/**
* Schedule a Task using a CronExpression.
*
* @param task Task to be scheduled once
* @param cronExpression CronExpression for creating the Schedule for the given Task
- * @param durable true if this Schedule should survive a restart.
*
* @return The newly created Schedule
*/
@UnitOfWorkPropagation( MANDATORY )
- Schedule scheduleCron( Task task, @CronExpression String cronExpression, boolean durable );
+ Schedule scheduleCron( Task task, @CronExpression String cronExpression );
/**
* Schedule a Task using a CronExpression with a given initial delay in milliseconds.
@@ -96,12 +95,11 @@ public interface Scheduler
* @param task Task to be scheduled once
* @param cronExpression CronExpression for creating the Schedule for the given Task
* @param initialDelay Initial delay the Schedule will be active after, in milliseconds
- * @param durable true if this Schedule should survive a restart.
*
* @return The newly created Schedule
*/
@UnitOfWorkPropagation( MANDATORY )
- Schedule scheduleCron( Task task, @CronExpression String cronExpression, long initialDelay, boolean durable );
+ Schedule scheduleCron( Task task, @CronExpression String cronExpression, long initialDelay );
/**
* Schedule a Task using a CronExpression starting at a given date.
@@ -109,10 +107,35 @@ public interface Scheduler
* @param task Task to be scheduled once
* @param cronExpression CronExpression for creating the Schedule for the given Task
* @param start Date from which the Schedule will become active
- * @param durable true if this Schedule should survive a restart.
*
* @return The newly created Schedule
*/
@UnitOfWorkPropagation( MANDATORY )
- Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start, boolean durable );
-}
+ Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start );
+
+ /** Schedules a custom Schedule.
+ *
+ *
+ * @param schedule The Schedule instance to be scheduled.
+ */
+ @UnitOfWorkPropagation( MANDATORY )
+ void scheduleCron( Schedule schedule );
+
+ /** Cancels a Schedule.
+ * Reads the Schedule from the EntityStore and calls {@link #cancelSchedule(Schedule)}.
+ *
+ * @param scheduleId The identity of the Schedule to be cancelled.
+ */
+ @UnitOfWorkPropagation( MANDATORY )
+ void cancelSchedule( String scheduleId );
+
+ /** Cancels the provided Schedule.
+ *
+ * Cancellation can be done before, while and after execution of the Schedule. If the execution
+ * is in progress, it will not be interrupted.
+ *
+ * @param schedule The schedule to be cancelled.
+ */
+ @UnitOfWorkPropagation( MANDATORY )
+ public void cancelSchedule( Schedule schedule );
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
index e338c31..0ebc81d 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java
@@ -33,13 +33,13 @@ public interface SchedulerConfiguration
/**
* @return Number of worker threads, optional and defaults to the number of available cores.
*/
- @Optional
+ @Optional @UseDefaults
Property<Integer> workersCount();
/**
* @return Size of the queue to use for holding tasks before they are run, optional and defaults to 10.
*/
- @Optional
+ @Optional @UseDefaults
Property<Integer> workQueueSize();
/**
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
index 69329dc..52c2f56 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
@@ -18,16 +18,6 @@
*/
package org.apache.zest.library.scheduler;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.Structure;
@@ -37,11 +27,9 @@ import org.apache.zest.api.structure.Module;
import org.apache.zest.api.unitofwork.NoSuchEntityException;
import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
-import org.apache.zest.api.usecase.Usecase;
import org.apache.zest.api.usecase.UsecaseBuilder;
import org.apache.zest.library.scheduler.schedule.Schedule;
import org.apache.zest.library.scheduler.schedule.ScheduleFactory;
-import org.apache.zest.library.scheduler.schedule.ScheduleTime;
import org.apache.zest.library.scheduler.schedule.Schedules;
import org.apache.zest.library.scheduler.schedule.cron.CronExpression;
import org.joda.time.DateTime;
@@ -52,17 +40,10 @@ public class SchedulerMixin
implements Scheduler, ServiceActivation
{
private static final Logger LOGGER = LoggerFactory.getLogger( Scheduler.class );
- private static final int DEFAULT_WORKERS_COUNT = Runtime.getRuntime().availableProcessors() + 1;
- private static final int DEFAULT_WORKQUEUE_SIZE = 10;
@Service
private ScheduleFactory scheduleFactory;
- private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>();
-
- private ScheduledExecutorService managementExecutor;
- private ThreadPoolExecutor taskExecutor;
-
@Structure
private Module module;
@@ -70,304 +51,137 @@ public class SchedulerMixin
private SchedulerService me;
@This
- private ThreadFactory threadFactory;
+ private SchedulesHandler schedulesHandler;
@This
- private RejectedExecutionHandler rejectionHandler;
+ private Execution execution;
@This
private Configuration<SchedulerConfiguration> config;
- private ScheduleHandler scheduleHandler;
+ public SchedulerMixin()
+ {
+ }
@Override
- public Schedule scheduleOnce( Task task, int initialSecondsDelay, boolean durable )
+ public Schedule scheduleOnce( Task task, int initialSecondsDelay )
{
long now = System.currentTimeMillis();
- Schedule schedule = scheduleFactory.newOnceSchedule( task, new DateTime( now + initialSecondsDelay * 1000 ), durable );
- if( durable )
- {
- Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) );
- schedules.schedules().add( schedule );
- }
- dispatchForExecution( schedule );
+ Schedule schedule = scheduleFactory.newOnceSchedule( task, new DateTime( now + initialSecondsDelay * 1000 ) );
+ saveAndDispatch( schedule );
return schedule;
}
@Override
- public Schedule scheduleOnce( Task task, DateTime runAt, boolean durable )
+ public Schedule scheduleOnce( Task task, DateTime runAt )
{
- Schedule schedule = scheduleFactory.newOnceSchedule( task, runAt, durable );
- dispatchForExecution( schedule );
- if( durable )
- {
- Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) );
- schedules.schedules().add( schedule );
- }
+ Schedule schedule = scheduleFactory.newOnceSchedule( task, runAt );
+ saveAndDispatch( schedule );
return schedule;
}
@Override
- public Schedule scheduleCron( Task task, String cronExpression, boolean durable )
+ public Schedule scheduleCron( Task task, String cronExpression )
{
DateTime now = new DateTime();
- Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, now, durable );
- if( durable )
- {
- Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) );
- schedules.schedules().add( schedule );
- }
- dispatchForExecution( schedule );
+ Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, now );
+ saveAndDispatch( schedule );
return schedule;
}
@Override
- public Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start, boolean durable )
+ public Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start )
{
- Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start, durable );
- if( durable )
- {
- Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) );
- schedules.schedules().add( schedule );
- }
- dispatchForExecution( schedule );
+ Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start );
+ saveAndDispatch( schedule );
return schedule;
}
@Override
- public Schedule scheduleCron( Task task, String cronExpression, long initialDelay, boolean durable )
- {
- DateTime start = new DateTime( System.currentTimeMillis() + initialDelay );
- Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start, durable );
- if( durable )
- {
- Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) );
- schedules.schedules().add( schedule );
- }
- dispatchForExecution( schedule );
- return schedule;
- }
-
- private void dispatchForExecution( Schedule schedule )
- {
- long now = System.currentTimeMillis();
- synchronized( timingQueue )
- {
- if( timingQueue.size() == 0 )
- {
- long nextRun = schedule.nextRun( now );
- if( nextRun < 0 )
- {
- return;
- }
- timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) );
- if( scheduleHandler == null )
- {
- dispatchHandler();
- }
- }
- else
- {
- ScheduleTime first = timingQueue.first();
- long nextRun = schedule.nextRun( now );
- if( nextRun < 0 )
- {
- return;
- }
- timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) );
- ScheduleTime newFirst = timingQueue.first();
- if( !first.equals( newFirst ) )
- {
- // We need to restart the managementThread, which is currently waiting for a 'later' event to
- // occur than the one that was just scheduled.
- if( scheduleHandler != null && scheduleHandler.future != null )
- {
- scheduleHandler.future.cancel( true );
- }
- dispatchHandler();
- }
- }
- }
- }
-
- private void dispatchHandler()
+ public void scheduleCron( Schedule schedule )
{
- scheduleHandler = new ScheduleHandler();
- managementExecutor.schedule( scheduleHandler, timingQueue.first().nextTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS );
+ saveAndDispatch( schedule );
}
@Override
- public void activateService()
- throws Exception
+ public Schedule scheduleCron( Task task, String cronExpression, long initialDelay )
{
- // Handle configuration defaults
- SchedulerConfiguration configuration = config.get();
- Integer workersCount = configuration.workersCount().get();
- Integer workQueueSize = configuration.workQueueSize().get();
-
- if( workersCount == null )
- {
- workersCount = DEFAULT_WORKERS_COUNT;
- LOGGER.debug( "Workers count absent from configuration, falled back to default: {} workers", DEFAULT_WORKERS_COUNT );
- }
- if( workQueueSize == null )
- {
- workQueueSize = DEFAULT_WORKQUEUE_SIZE;
- LOGGER.debug( "WorkQueue size absent from configuration, falled back to default: {}", DEFAULT_WORKQUEUE_SIZE );
- }
-
- int corePoolSize = 2;
- if( workersCount > 4 )
- {
- corePoolSize = workersCount / 4;
- }
- // Throws IllegalArgument if corePoolSize or keepAliveTime less than zero,
- // or if workersCount less than or equal to zero,
- // or if corePoolSize greater than workersCount.
- taskExecutor = new ThreadPoolExecutor( corePoolSize, workersCount,
- 0, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>( workQueueSize ),
- threadFactory, rejectionHandler );
- taskExecutor.prestartAllCoreThreads();
- managementExecutor = new ScheduledThreadPoolExecutor( 2, threadFactory, rejectionHandler );
- loadSchedules();
- LOGGER.debug( "Activated" );
+ DateTime start = new DateTime( System.currentTimeMillis() + initialDelay );
+ Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start );
+ saveAndDispatch( schedule );
+ return schedule;
}
- private void loadSchedules()
- throws UnitOfWorkCompletionException
+ @Override
+ public void cancelSchedule( String scheduleId )
{
- UnitOfWork uow = module.newUnitOfWork();
+ UnitOfWork uow = module.currentUnitOfWork();
+ Schedule schedule = null;
try
{
- Schedules schedules = uow.get( Schedules.class, getSchedulesIdentity( me ) );
- for( Schedule schedule : schedules.schedules() )
- {
- dispatchForExecution( schedule );
- }
+ schedule = uow.get( Schedule.class, scheduleId );
}
catch( NoSuchEntityException e )
{
- // Create a new Schedules entity for keeping track of them all.
- uow.newEntity( Schedules.class, getSchedulesIdentity( me ) );
- uow.complete();
- }
- finally
- {
- if( uow.isOpen() )
- {
- uow.discard();
- }
+ return;
}
+ cancelSchedule( schedule );
}
- public static String getSchedulesIdentity( SchedulerService service )
+ @Override
+ public void cancelSchedule( Schedule schedule )
{
- return "Schedules:" + service.identity().get();
+ Schedules active = schedulesHandler.getActiveSchedules();
+ if( active.schedules().remove( schedule ) )
+ {
+ schedule.cancelled().set( true );
+ }
}
- @Override
- public void passivateService()
- throws Exception
+ private void saveAndDispatch( Schedule schedule )
{
- managementExecutor.shutdown();
- taskExecutor.shutdown();
-
- managementExecutor.awaitTermination( 5, TimeUnit.SECONDS );
- managementExecutor.shutdownNow();
-
- taskExecutor.awaitTermination( 5, TimeUnit.SECONDS );
- taskExecutor.shutdownNow();
-
- LOGGER.debug( "Passivated" );
+ Schedules schedules = schedulesHandler.getActiveSchedules();
+ schedules.schedules().add( schedule );
+ execution.dispatchForExecution( schedule );
}
- /**
- * This little bugger wakes up when it is time to dispatch a Task, creates the Runner and dispatches itself
- * for the next run.
- */
- class ScheduleHandler
- implements Runnable
+ private void loadSchedules()
+ throws UnitOfWorkCompletionException
{
- private ScheduledFuture<?> future;
-
- @Override
- public void run()
+ try (UnitOfWork ignored = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Initialize Schedules" ) ))
{
- synchronized( timingQueue )
+ Schedules schedules = schedulesHandler.getActiveSchedules();
+ for( Schedule schedule : schedules.schedules() )
{
- ScheduleTime scheduleTime = timingQueue.first();
- timingQueue.remove( scheduleTime );
- ScheduleRunner scheduleRunner = new ScheduleRunner( scheduleTime, SchedulerMixin.this, module );
- taskExecutor.submit( scheduleRunner );
- if( timingQueue.size() == 0 )
+ if( schedule.cancelled().get() || schedule.done().get() )
{
- scheduleHandler = null;
+ schedules.schedules().remove( schedule );
}
else
{
- ScheduleTime nextTime = timingQueue.first();
- future = managementExecutor.schedule( scheduleHandler, nextTime.nextTime, TimeUnit.MILLISECONDS );
+ execution.dispatchForExecution( schedule );
}
}
}
}
- /**
- * Handle {@link Task}'s {@link org.apache.zest.api.unitofwork.UnitOfWork} and {@link org.apache.zest.library.scheduler.timeline.TimelineRecord}s creation.
- */
- public static class ScheduleRunner
- implements Runnable
+ @Override
+ public void activateService()
+ throws Exception
{
- private final Module module;
- private final ScheduleTime schedule;
- private final SchedulerMixin schedulerMixin;
-
- public ScheduleRunner( ScheduleTime schedule, SchedulerMixin schedulerMixin, Module module )
- {
- this.schedule = schedule;
- this.schedulerMixin = schedulerMixin;
- this.module = module;
- }
+ // Throws IllegalArgument if corePoolSize or keepAliveTime less than zero,
+ // or if workersCount less than or equal to zero,
+ // or if corePoolSize greater than workersCount.
+ loadSchedules();
+ execution.start();
+ LOGGER.debug( "Activated" );
+ }
- // WARN Watch this code, see if we can do better, maybe leverage @UnitOfWorkRetry
- @Override
- public void run()
- {
- Usecase usecase = UsecaseBuilder.newUsecase( "ScheduleRunner" );
- UnitOfWork uow = module.newUnitOfWork( usecase );
- Schedule schedule = null;
- try
- {
- schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity );
- Task task = schedule.task().get();
- try
- {
- schedule.taskStarting();
- task.run();
- schedule.taskCompletedSuccessfully();
- }
- catch( RuntimeException ex )
- {
- schedule.taskCompletedWithException( ex );
- }
- uow.complete();
- }
- catch( UnitOfWorkCompletionException ex )
- {
- }
- finally
- {
- if( schedule != null )
- {
- schedulerMixin.dispatchForExecution( schedule );
- }
- // What should we do if we can't manage the Running flag??
- if( uow.isOpen() )
- {
- uow.discard();
- }
- }
- }
+ @Override
+ public void passivateService()
+ throws Exception
+ {
+ execution.stop();
+ LOGGER.debug( "Passivated" );
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java
index 1200594..0d1ef4c 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java
@@ -18,61 +18,14 @@
*/
package org.apache.zest.library.scheduler;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zest.api.entity.Identity;
-import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.mixin.Mixins;
import org.apache.zest.api.service.ServiceActivation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.zest.library.scheduler.defaults.DefaultRejectionHandler;
+import org.apache.zest.library.scheduler.defaults.DefaultThreadFactory;
-@Mixins( { SchedulerMixin.class, SchedulerService.ThreadFactory.class, SchedulerService.RejectionHandler.class } )
+@Mixins( { SchedulerMixin.class, DefaultThreadFactory.class, DefaultRejectionHandler.class } )
public interface SchedulerService
extends Scheduler, ServiceActivation, Identity
{
- class RejectionHandler
- implements RejectedExecutionHandler
- {
- private static final Logger LOGGER = LoggerFactory.getLogger( SchedulerService.class );
-
- @Override
- public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
- {
- LOGGER.error( "Runnable [" + r + "] was rejected by executor [" + executor + "]" );
- }
- }
-
- class ThreadFactory
- implements java.util.concurrent.ThreadFactory
- {
- private static final AtomicInteger POOL_NUMBER = new AtomicInteger( 1 );
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger( 1 );
- private final String namePrefix;
-
- protected ThreadFactory( @This SchedulerService me )
- {
- SecurityManager sm = System.getSecurityManager();
- group = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
- namePrefix = me.identity().get() + "-P" + POOL_NUMBER.getAndIncrement() + "W";
- }
-
- @Override
- public Thread newThread( Runnable runnable )
- {
- Thread thread = new Thread( group, runnable, namePrefix + threadNumber.getAndIncrement(), 0 );
- if( thread.isDaemon() )
- {
- thread.setDaemon( false );
- }
- if( thread.getPriority() != Thread.NORM_PRIORITY )
- {
- thread.setPriority( Thread.NORM_PRIORITY );
- }
- return thread;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java
new file mode 100644
index 0000000..b76ef96
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler;
+
+import org.apache.zest.api.entity.Identity;
+import org.apache.zest.api.injection.scope.Structure;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.api.mixin.Mixins;
+import org.apache.zest.api.structure.Module;
+import org.apache.zest.api.unitofwork.NoSuchEntityException;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.library.scheduler.schedule.Schedules;
+
+@Mixins(SchedulesHandler.SchedulesHandlerMixin.class)
+public interface SchedulesHandler
+{
+ @UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY)
+ Schedules getActiveSchedules();
+
+ @UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY)
+ Schedules getCancelledSchedules();
+
+ class SchedulesHandlerMixin implements SchedulesHandler
+ {
+ @This
+ private Identity me;
+
+ @Structure
+ private Module module;
+
+ @Override
+ public Schedules getActiveSchedules()
+ {
+ return getOrCreateSchedules(getActiveSchedulesIdentity());
+ }
+
+ @Override
+ public Schedules getCancelledSchedules()
+ {
+ return getOrCreateSchedules(getCancelledSchedulesIdentity());
+ }
+
+ public String getActiveSchedulesIdentity()
+ {
+ return "Schedules-Active:" + me.identity().get();
+ }
+
+ public String getCancelledSchedulesIdentity()
+ {
+ return "Schedules-Cancelled:" + me.identity().get();
+ }
+
+ private Schedules getOrCreateSchedules( String identity ){
+ UnitOfWork uow = module.currentUnitOfWork();
+ Schedules schedules;
+ try
+ {
+ schedules = uow.get( Schedules.class, identity );
+ }
+ catch( NoSuchEntityException e )
+ {
+ // Create a new Schedules entity for keeping track of them all.
+ schedules = uow.newEntity( Schedules.class, identity );
+ }
+ return schedules;
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java
index 6832ce5..6cc284c 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java
@@ -20,17 +20,28 @@ package org.apache.zest.library.scheduler;
import java.util.List;
import org.apache.zest.api.common.UseDefaults;
+import org.apache.zest.api.concern.Concerns;
import org.apache.zest.api.property.Property;
import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
/**
* Compose an Entity using this type to be able to Schedule it.
+ *<p>
+ * A Task is associated from a {@link org.apache.zest.library.scheduler.schedule.Schedule}, and upon time to execute
+ * the SchedulerService will dispatch a TaskRunner in a new thread, and establish a UnitOfWork (Usecase name of "Task Runner").
+ *</p>
+ *<p>
+ * The {@code Task} type declares the {@link UnitOfWorkConcern} and therefor the {@code Task} implementation may
+ * declare the {@link UnitOfWorkPropagation} annotation with the
+ * {@link org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propagation#REQUIRES_NEW} and a different
+ * {@link UnitOfWork} strategy, such as {@code Retries} and {@code DiscardOn}.
*
- * A Task is wrapped in a {@link org.apache.zest.library.scheduler.SchedulerMixin.ScheduleRunner} before being run by an executor.
- * {@link org.apache.zest.library.scheduler.SchedulerMixin.ScheduleRunner} wrap a {@link UnitOfWork} around the {@link Task#run()} invocation.
+ *</p>
*
* Here is a simple example:
- * <pre>
+ * <pre><code>
* interface MyTask
* extends Task
* {
@@ -42,15 +53,18 @@ import org.apache.zest.api.unitofwork.UnitOfWork;
* implements Runnable
* {
* @This MyTaskEntity me;
+ *
* public void run()
* {
* me.customState().set( me.anotherEntity().get().doSomeStuff( me.customState().get() ) );
* }
* }
- * </pre>
+ * </code></pre>
+ *
* Finaly, {@literal MyTask} must be assembled into an {@literal EntityComposite}.
*/
// START SNIPPET: task
+@Concerns( UnitOfWorkConcern.class )
public interface Task
extends Runnable
{
@@ -58,5 +72,6 @@ public interface Task
@UseDefaults
Property<List<String>> tags();
+
}
// END SNIPPET: task
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
new file mode 100644
index 0000000..8beacee
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import org.apache.zest.api.injection.scope.Structure;
+import org.apache.zest.api.injection.scope.Uses;
+import org.apache.zest.api.structure.Module;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.library.scheduler.schedule.Schedule;
+import org.apache.zest.library.scheduler.schedule.ScheduleTime;
+
+public class TaskRunner
+ implements Runnable
+{
+ @Structure
+ private Module module;
+
+ @Uses
+ private ScheduleTime schedule;
+
+ @Override
+ @UnitOfWorkPropagation( usecase = "Task Runner" )
+ public void run()
+ {
+ try
+ {
+ UnitOfWork uow = module.currentUnitOfWork();
+ Schedule schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity() );
+ Task task = schedule.task().get();
+ try
+ {
+ schedule.taskStarting();
+ task.run();
+ schedule.taskCompletedSuccessfully();
+ }
+ catch( RuntimeException ex )
+ {
+ schedule.taskCompletedWithException( ex );
+ schedule.exceptionCounter().set( schedule.exceptionCounter().get() + 1 );
+ }
+ schedule.executionCounter().set( schedule.executionCounter().get() + 1 );
+ uow.complete();
+ }
+ catch( Exception e )
+ {
+ throw new UndeclaredThrowableException( e );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java
index 92a7aa9..af5f475 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java
@@ -18,6 +18,7 @@
*/
package org.apache.zest.library.scheduler.bootstrap;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
import org.apache.zest.bootstrap.Assemblers;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.EntityDeclaration;
@@ -26,6 +27,7 @@ import org.apache.zest.bootstrap.ServiceDeclaration;
import org.apache.zest.bootstrap.ValueDeclaration;
import org.apache.zest.library.scheduler.SchedulerConfiguration;
import org.apache.zest.library.scheduler.SchedulerService;
+import org.apache.zest.library.scheduler.TaskRunner;
import org.apache.zest.library.scheduler.schedule.ScheduleFactory;
import org.apache.zest.library.scheduler.schedule.Schedules;
import org.apache.zest.library.scheduler.schedule.cron.CronSchedule;
@@ -55,6 +57,10 @@ import org.apache.zest.library.scheduler.timeline.TimelineSchedulerServiceMixin;
public class SchedulerAssembler
extends Assemblers.VisibilityConfig<SchedulerAssembler>
{
+
+ private static final int DEFAULT_WORKERS_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+ private static final int DEFAULT_WORKQUEUE_SIZE = 10;
+
private boolean timeline;
/**
@@ -82,6 +88,8 @@ public class SchedulerAssembler
.visibleIn( visibility() )
.instantiateOnStartup();
+ assembly.transients( Runnable.class ).withMixins( TaskRunner.class ).withConcerns( UnitOfWorkConcern.class );
+
if( timeline )
{
scheduleEntities.withTypes( Timeline.class )
@@ -99,7 +107,11 @@ public class SchedulerAssembler
if( hasConfig() )
{
- configModule().entities( SchedulerConfiguration.class ).visibleIn( configVisibility() );
+ configModule().entities( SchedulerConfiguration.class )
+ .visibleIn( configVisibility() );
+ SchedulerConfiguration defaults = assembly.forMixin( SchedulerConfiguration.class ).declareDefaults();
+ defaults.workersCount().set( DEFAULT_WORKERS_COUNT );
+ defaults.workQueueSize().set( DEFAULT_WORKQUEUE_SIZE );
}
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java
new file mode 100644
index 0000000..9a8e631
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler.defaults;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.zest.library.scheduler.SchedulerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultRejectionHandler
+ implements RejectedExecutionHandler
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger( SchedulerService.class );
+
+ @Override
+ public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
+ {
+ LOGGER.error( "Runnable [" + r + "] was rejected by executor [" + executor + "]" );
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java
new file mode 100644
index 0000000..f05c041
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler.defaults;
+
+import org.apache.zest.api.entity.EntityBuilder;
+import org.apache.zest.api.injection.scope.Service;
+import org.apache.zest.api.injection.scope.Structure;
+import org.apache.zest.api.structure.Module;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.library.scheduler.SchedulerService;
+import org.apache.zest.library.scheduler.Task;
+import org.apache.zest.library.scheduler.schedule.Schedule;
+import org.apache.zest.library.scheduler.schedule.ScheduleFactory;
+import org.apache.zest.library.scheduler.schedule.cron.CronSchedule;
+import org.apache.zest.library.scheduler.schedule.once.OnceSchedule;
+import org.apache.zest.spi.uuid.UuidIdentityGeneratorService;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultScheduleFactoryMixin
+ implements ScheduleFactory
+{
+ private static final Logger logger = LoggerFactory.getLogger( ScheduleFactory.class );
+
+ @Structure
+ private Module module;
+
+ @Service
+ private SchedulerService scheduler;
+
+ @Service
+ private UuidIdentityGeneratorService uuid;
+
+ @Override
+ public CronSchedule newCronSchedule( Task task, String cronExpression, DateTime start )
+ {
+ return newPersistentCronSchedule( task, cronExpression, start );
+ }
+
+ @Override
+ public Schedule newOnceSchedule( Task task, DateTime runAt )
+ {
+ return newPersistentOnceSchedule( task, runAt );
+ }
+
+ private CronSchedule newPersistentCronSchedule( Task task, String cronExpression, DateTime start )
+ {
+ UnitOfWork uow = module.currentUnitOfWork();
+ EntityBuilder<CronSchedule> builder = uow.newEntityBuilder( CronSchedule.class );
+ CronSchedule instance = builder.instance();
+ instance.task().set( task );
+ instance.start().set( start );
+ instance.identity().set( uuid.generate( CronSchedule.class ) );
+ instance.cronExpression().set( cronExpression );
+ CronSchedule schedule = builder.newInstance();
+ logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
+ return schedule;
+ }
+
+ private Schedule newPersistentOnceSchedule( Task task, DateTime runAt )
+ {
+ UnitOfWork uow = module.currentUnitOfWork();
+ EntityBuilder<OnceSchedule> builder = uow.newEntityBuilder( OnceSchedule.class );
+ OnceSchedule builderInstance = builder.instance();
+ builderInstance.task().set( task );
+ builderInstance.start().set( runAt );
+ builderInstance.identity().set( uuid.generate( OnceSchedule.class ) );
+ OnceSchedule schedule = builder.newInstance();
+ logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
+ return schedule;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java
new file mode 100644
index 0000000..c834f50
--- /dev/null
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler.defaults;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zest.api.injection.scope.This;
+import org.apache.zest.library.scheduler.SchedulerService;
+
+public class DefaultThreadFactory
+ implements java.util.concurrent.ThreadFactory
+{
+ private static final AtomicInteger POOL_NUMBER = new AtomicInteger( 1 );
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger( 1 );
+ private final String namePrefix;
+
+ protected DefaultThreadFactory( @This SchedulerService me )
+ {
+ SecurityManager sm = System.getSecurityManager();
+ group = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ namePrefix = me.identity().get() + "-P" + POOL_NUMBER.getAndIncrement() + "W";
+ }
+
+ @Override
+ public Thread newThread( Runnable runnable )
+ {
+ Thread thread = new Thread( group, runnable, namePrefix + threadNumber.getAndIncrement(), 0 );
+ if( thread.isDaemon() )
+ {
+ thread.setDaemon( false );
+ }
+ if( thread.getPriority() != Thread.NORM_PRIORITY )
+ {
+ thread.setPriority( Thread.NORM_PRIORITY );
+ }
+ return thread;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
index 9427be3..d2da51b 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java
@@ -18,6 +18,8 @@
*/
package org.apache.zest.library.scheduler.schedule;
+import org.apache.zest.api.common.UseDefaults;
+import org.apache.zest.api.entity.EntityComposite;
import org.joda.time.DateTime;
import org.apache.zest.api.association.Association;
import org.apache.zest.api.entity.Identity;
@@ -28,8 +30,7 @@ import org.apache.zest.library.scheduler.Task;
/**
* Represent the scheduling of a {@link Task}.
*/
-public interface Schedule
- extends Identity
+public interface Schedule extends EntityComposite
{
/**
* @return The Association to the Task to be executed when it is time.
@@ -43,6 +44,50 @@ public interface Schedule
@Immutable
Property<DateTime> start();
+ /** Returns true if the Schedule has been cancelled.
+ *
+ * @return true if the Schedule has been cancelled.
+ */
+ @UseDefaults
+ Property<Boolean> cancelled();
+
+ /** Returns true if the Schedule is currently running.
+ *
+ * @return true if the Schedule is currently running.
+ */
+ @UseDefaults
+ Property<Boolean> running();
+
+ /** Returns the number of times the {@link Task} has been executed.
+ * <p>
+ * Each time the {@link Task#run} method completes, with or without an {@link Exception}, this
+ * counter is incremented by 1.
+ * </p>
+ *
+ * @return true the number of Exception that has occurred when running the {@link Task}.
+ */
+ @UseDefaults
+ Property<Long> executionCounter();
+
+ /** Returns the number of Exception that has occurred when running the {@link Task}.
+ * <p>
+ * Each time the {@link Task#run} method throws a {@link RuntimeException}, this property
+ * is incremenented by 1,
+ * </p>
+ *
+ * @return true the number of Exception that has occurred when running the {@link Task}.
+ */
+ @UseDefaults
+ Property<Long> exceptionCounter();
+
+ /** Returns true if the Schedule is done and will not be executed any more times.
+ *
+ * @return true if the Schedule is done and will not be executed any more times.
+ */
+ @UseDefaults
+ Property<Boolean> done();
+
+
/**
* Called just before the {@link org.apache.zest.library.scheduler.Task#run()} method is called.
*/
@@ -62,11 +107,6 @@ public interface Schedule
void taskCompletedWithException( RuntimeException ex );
/**
- * @return True if the associated {@link org.apache.zest.library.scheduler.Task} is currently running, false otherwise
- */
- boolean isTaskRunning();
-
- /**
* Compute the next time this schedule is to be run.
*
* @param from The starting time when to look for the next time it will run.
@@ -81,4 +121,5 @@ public interface Schedule
* @return A String representing this schedule.
*/
String presentationString();
+
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java
index 8506d4b..133ec1c 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java
@@ -15,114 +15,23 @@
*/
package org.apache.zest.library.scheduler.schedule;
+import org.apache.zest.api.concern.Concerns;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
+import org.apache.zest.library.scheduler.defaults.DefaultScheduleFactoryMixin;
import org.joda.time.DateTime;
-import org.apache.zest.api.entity.EntityBuilder;
-import org.apache.zest.api.injection.scope.Service;
-import org.apache.zest.api.injection.scope.Structure;
import org.apache.zest.api.mixin.Mixins;
-import org.apache.zest.api.structure.Module;
-import org.apache.zest.api.unitofwork.UnitOfWork;
-import org.apache.zest.api.value.ValueBuilder;
-import org.apache.zest.library.scheduler.SchedulerService;
import org.apache.zest.library.scheduler.Task;
-import org.apache.zest.library.scheduler.schedule.cron.CronSchedule;
-import org.apache.zest.library.scheduler.schedule.once.OnceSchedule;
-import org.apache.zest.spi.uuid.UuidIdentityGeneratorService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-@Mixins( ScheduleFactory.Mixin.class )
+import static org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propagation.MANDATORY;
+
+@Mixins( DefaultScheduleFactoryMixin.class )
+@Concerns( UnitOfWorkConcern.class )
public interface ScheduleFactory
{
- Schedule newCronSchedule( Task task, String cronExpression, DateTime start, boolean durable );
-
- Schedule newOnceSchedule( Task task, DateTime runAt, boolean durable );
-
- class Mixin
- implements ScheduleFactory
- {
- private static final Logger logger = LoggerFactory.getLogger( ScheduleFactory.class );
-
- @Structure
- private Module module;
-
- @Service
- private SchedulerService scheduler;
-
- @Service
- private UuidIdentityGeneratorService uuid;
-
- @Override
- public CronSchedule newCronSchedule( Task task, String cronExpression, DateTime start, boolean durable )
- {
- if( durable )
- {
- return newPersistentCronSchedule( task, cronExpression, start );
- }
- return newTransientCronSchedule( task, cronExpression, start );
- }
-
- @Override
- public Schedule newOnceSchedule( Task task, DateTime runAt, boolean durable )
- {
- if( durable )
- {
- return newPersistentOnceSchedule( task, runAt );
- }
- return newTransientOnceSchedule( task, runAt );
- }
-
- private CronSchedule newTransientCronSchedule( Task task, String cronExpression, DateTime start )
- {
- ValueBuilder<CronSchedule> builder = module.newValueBuilder( CronSchedule.class );
- CronSchedule prototype = builder.prototype();
- prototype.task().set( task );
- prototype.start().set( start );
- prototype.identity().set( uuid.generate( CronSchedule.class ) );
- prototype.cronExpression().set( cronExpression );
- CronSchedule schedule = builder.newInstance();
- logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
- return schedule;
- }
-
- private CronSchedule newPersistentCronSchedule( Task task, String cronExpression, DateTime start )
- {
- UnitOfWork uow = module.currentUnitOfWork();
- EntityBuilder<CronSchedule> builder = uow.newEntityBuilder( CronSchedule.class );
- CronSchedule builderInstance = builder.instance();
- builderInstance.task().set( task );
- builderInstance.start().set( start );
- builderInstance.identity().set( uuid.generate( CronSchedule.class ) );
- builderInstance.cronExpression().set( cronExpression );
- CronSchedule schedule = builder.newInstance();
- logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
- return schedule;
- }
-
- private Schedule newTransientOnceSchedule( Task task, DateTime runAt )
- {
- ValueBuilder<OnceSchedule> builder = module.newValueBuilder( OnceSchedule.class );
- OnceSchedule builderInstance = builder.prototype();
- builderInstance.task().set( task );
- builderInstance.start().set( runAt );
- builderInstance.identity().set( uuid.generate( CronSchedule.class ) );
- OnceSchedule schedule = builder.newInstance();
- logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
- return schedule;
- }
-
- private Schedule newPersistentOnceSchedule( Task task, DateTime runAt )
- {
- UnitOfWork uow = module.currentUnitOfWork();
- EntityBuilder<OnceSchedule> builder = uow.newEntityBuilder( OnceSchedule.class );
- OnceSchedule builderInstance = builder.instance();
- builderInstance.task().set( task );
- builderInstance.start().set( runAt );
- builderInstance.identity().set( uuid.generate( CronSchedule.class ) );
- OnceSchedule schedule = builder.newInstance();
- logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() );
- return schedule;
- }
- }
+ @UnitOfWorkPropagation( MANDATORY)
+ Schedule newCronSchedule( Task task, String cronExpression, DateTime start );
+ @UnitOfWorkPropagation( MANDATORY)
+ Schedule newOnceSchedule( Task task, DateTime runAt );
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java
index 0560f9b..ed38cd6 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java
@@ -16,18 +16,17 @@
*/
package org.apache.zest.library.scheduler.schedule;
+import org.apache.zest.api.util.NullArgumentException;
+
public final class ScheduleTime
implements Comparable<ScheduleTime>
{
- public String scheduleIdentity;
- public long nextTime;
+ private String scheduleIdentity;
+ private long nextTime;
public ScheduleTime( String scheduleIdentity, long nextTime )
{
- if( scheduleIdentity == null )
- {
- throw new IllegalArgumentException( "null not allowed: " + scheduleIdentity );
- }
+ NullArgumentException.validateNotEmpty( "scheduleIdentity", scheduleIdentity );
this.scheduleIdentity = scheduleIdentity;
this.nextTime = nextTime;
}
@@ -59,6 +58,16 @@ public final class ScheduleTime
return result;
}
+ public long nextTime()
+ {
+ return nextTime;
+ }
+
+ public String scheduleIdentity()
+ {
+ return scheduleIdentity;
+ }
+
@Override
public int compareTo( ScheduleTime another )
{
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
index 84deb7a..d137cb3 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
@@ -42,24 +42,23 @@ public interface CronSchedule
implements CronSchedule
{
private static final Logger LOGGER = LoggerFactory.getLogger( Schedule.class );
- private boolean running;
@Override
public void taskStarting()
{
- running = true;
+ running().set( true );
}
@Override
public void taskCompletedSuccessfully()
{
- running = false;
+ running().set(false);
}
@Override
public void taskCompletedWithException( RuntimeException ex )
{
- running = false;
+ running().set(false);
}
@Override
@@ -69,13 +68,6 @@ public interface CronSchedule
}
@Override
- public boolean isTaskRunning()
- {
- // See SchedulerMixin.ScheduleRunner::run
- return false;
- }
-
- @Override
public long nextRun( long from )
{
long actualFrom = from;
@@ -84,11 +76,15 @@ public interface CronSchedule
{
actualFrom = firstRun;
}
- Long nextRun = new org.codeartisans.sked.cron.CronSchedule( cronExpression().get() )
- .firstRunAfter( actualFrom );
+ Long nextRun = createCron().firstRunAfter( actualFrom );
LOGGER.info( "CronSchedule::nextRun({}) is {}", from, firstRun );
return nextRun;
}
+
+ private org.codeartisans.sked.cron.CronSchedule createCron()
+ {
+ return new org.codeartisans.sked.cron.CronSchedule( cronExpression().get() );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
index 52a63ca..66fdb21 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
@@ -26,41 +26,38 @@ public interface OnceSchedule
abstract class OnceScheduleMixin
implements OnceSchedule
{
- private boolean running;
-
@Override
public void taskStarting()
{
- running = true;
+ running().set( true );
}
@Override
public void taskCompletedSuccessfully()
{
- running = false;
+ running().set( false );
}
@Override
public void taskCompletedWithException( RuntimeException ex )
{
- running = false;
- }
-
- @Override
- public boolean isTaskRunning()
- {
- return running;
+ running().set( false );
}
@Override
public long nextRun( long from )
{
+ if( done().get() )
+ {
+ return Long.MIN_VALUE;
+ }
+ done().set( true );
long runAt = start().get().getMillis();
if( runAt >= from )
{
return runAt;
}
- return -1;
+ return from;
}
@Override
@@ -69,5 +66,4 @@ public interface OnceSchedule
return start().get().toString();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java
index 798e451..1c2e7e7 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java
@@ -18,7 +18,7 @@
*/
package org.apache.zest.library.scheduler.timeline;
-import org.joda.time.DateTime;
+import java.time.ZonedDateTime;
import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
/**
@@ -57,7 +57,7 @@ public interface Timeline
*/
@UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY )
// START SNIPPET: timeline
- Iterable<TimelineRecord> getRecords( DateTime from, DateTime to );
+ Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to );
// END SNIPPET: timeline
/**
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java
index 3a4c024..d66898e 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java
@@ -15,6 +15,7 @@
*/
package org.apache.zest.library.scheduler.timeline;
+import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -70,9 +71,9 @@ public class TimelineScheduleMixin
}
@Override
- public Iterable<TimelineRecord> getRecords( DateTime from, DateTime to )
+ public Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to )
{
- return getRecords( from.getMillis(), to.getMillis() );
+ return getRecords( from.toInstant().toEpochMilli(), to.toInstant().toEpochMilli() );
}
@Override
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java
index 1823cb0..3e097b4 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java
@@ -15,17 +15,16 @@
*/
package org.apache.zest.library.scheduler.timeline;
+import java.time.ZonedDateTime;
import java.util.SortedSet;
import java.util.TreeSet;
-import org.joda.time.DateTime;
-import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.Structure;
+import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceComposite;
import org.apache.zest.api.structure.Module;
-import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.functional.Iterables;
-import org.apache.zest.library.scheduler.SchedulerMixin;
import org.apache.zest.library.scheduler.SchedulerService;
+import org.apache.zest.library.scheduler.SchedulesHandler;
import org.apache.zest.library.scheduler.schedule.Schedule;
import org.apache.zest.library.scheduler.schedule.Schedules;
@@ -39,17 +38,18 @@ public abstract class TimelineSchedulerServiceMixin
@Structure
private Module module;
- @Service
+ @This
private SchedulerService scheduler;
+ @This
+ private SchedulesHandler schedulesHandler;
+
@Override
public Iterable<TimelineRecord> getLastRecords( int maxResults )
{
SortedSet<TimelineRecord> result = new TreeSet<>();
- UnitOfWork uow = module.currentUnitOfWork();
- String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler );
- Schedules schedules = uow.get( Schedules.class, schedulesName );
+ Schedules schedules = schedulesHandler.getActiveSchedules();
for( Schedule schedule : schedules.schedules() )
{
Timeline timeline = (Timeline) schedule;
@@ -63,9 +63,7 @@ public abstract class TimelineSchedulerServiceMixin
public Iterable<TimelineRecord> getNextRecords( int maxResults )
{
SortedSet<TimelineRecord> result = new TreeSet<>();
- UnitOfWork uow = module.currentUnitOfWork();
- String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler );
- Schedules schedules = uow.get( Schedules.class, schedulesName );
+ Schedules schedules = schedulesHandler.getActiveSchedules();
for( Schedule schedule : schedules.schedules() )
{
Timeline timeline = (Timeline) schedule;
@@ -76,13 +74,11 @@ public abstract class TimelineSchedulerServiceMixin
}
@Override
- public Iterable<TimelineRecord> getRecords( DateTime from, DateTime to )
+ public Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to )
{
SortedSet<TimelineRecord> result = new TreeSet<>();
- UnitOfWork uow = module.currentUnitOfWork();
- String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler );
- Schedules schedules = uow.get( Schedules.class, schedulesName );
+ Schedules schedules = schedulesHandler.getActiveSchedules();
for( Schedule schedule : schedules.schedules() )
{
Timeline timeline = (Timeline) schedule;
@@ -97,9 +93,7 @@ public abstract class TimelineSchedulerServiceMixin
{
SortedSet<TimelineRecord> result = new TreeSet<>();
- UnitOfWork uow = module.currentUnitOfWork();
- String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler );
- Schedules schedules = uow.get( Schedules.class, schedulesName );
+ Schedules schedules = schedulesHandler.getActiveSchedules();
for( Schedule schedule : schedules.schedules() )
{
Timeline timeline = (Timeline) schedule;
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java
index ca5b8bd..02d5636 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java
@@ -18,12 +18,18 @@
package org.apache.zest.library.scheduler;
import org.apache.zest.api.entity.EntityBuilder;
+import org.apache.zest.api.entity.IdentityGenerator;
import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.api.value.ValueSerialization;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.bootstrap.ServiceDeclaration;
+import org.apache.zest.entitystore.memory.MemoryEntityStoreService;
import org.apache.zest.index.rdf.assembly.RdfMemoryStoreAssembler;
+import org.apache.zest.spi.uuid.UuidIdentityGeneratorService;
import org.apache.zest.test.AbstractZestTest;
import org.apache.zest.test.EntityTestAssembler;
+import org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationService;
public abstract class AbstractSchedulerTest
extends AbstractZestTest
@@ -34,7 +40,9 @@ public abstract class AbstractSchedulerTest
{
assembly.entities( FooTask.class );
- new EntityTestAssembler().assemble( assembly );
+ assembly.services( MemoryEntityStoreService.class );
+ assembly.services( UuidIdentityGeneratorService.class).withMixins( CountingIdentityGeneratorService.class );
+ assembly.services( OrgJsonValueSerializationService.class ).taggedWith( ValueSerialization.Formats.JSON );
new RdfMemoryStoreAssembler().assemble( assembly );
onAssembly( assembly );
@@ -51,4 +59,16 @@ public abstract class AbstractSchedulerTest
task.input().set( input );
return builder.newInstance();
}
+
+ public static class CountingIdentityGeneratorService
+ implements IdentityGenerator
+ {
+ int counter = 0;
+
+ @Override
+ public String generate( Class<?> compositeType )
+ {
+ return compositeType.getSimpleName() + ":" + counter++;
+ }
+ }
}
[2/6] zest-java git commit: ZEST-128 - Fixed up Scheduler library so
that schedules are not lost. Tried to introduce better separation of
concerns. Added counters for Execution and Exceptions.
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/FooTask.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/FooTask.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/FooTask.java
index f4a5243..63fbee4 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/FooTask.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/FooTask.java
@@ -18,13 +18,17 @@
package org.apache.zest.library.scheduler;
import org.apache.zest.api.common.Optional;
+import org.apache.zest.api.common.UseDefaults;
import org.apache.zest.api.entity.Identity;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.mixin.Mixins;
import org.apache.zest.api.property.Property;
+import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propagation.REQUIRES_NEW;
+
@Mixins( FooTask.Mixin.class )
public interface FooTask
extends Task, Identity
@@ -34,10 +38,12 @@ public interface FooTask
@Optional
Property<String> output();
- public static abstract class Mixin
- implements Runnable
- {
+ @UseDefaults
+ Property<Integer> runCounter();
+ abstract class Mixin
+ implements Task
+ {
private static final Logger LOGGER = LoggerFactory.getLogger( FooTask.class );
@This
@@ -47,6 +53,12 @@ public interface FooTask
public void run()
{
LOGGER.info( "FooTask.run({})", me.input().get() );
+ synchronized( this )
+ {
+ me.runCounter().set( me.runCounter().get() + 1 );
+ LOGGER.info( "Identity: " + me.identity().get() );
+ LOGGER.info( " Counter: " + me.runCounter().get() );
+ }
if( me.input().get().equals( Constants.BAZAR ) )
{
if( me.output().get() == null )
@@ -60,5 +72,4 @@ public interface FooTask
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
index 550eff9..b74279c 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
@@ -19,9 +19,6 @@
package org.apache.zest.library.scheduler;
import java.util.concurrent.Callable;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Test;
import org.apache.zest.api.common.Visibility;
import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
@@ -30,18 +27,23 @@ import org.apache.zest.api.usecase.UsecaseBuilder;
import org.apache.zest.bootstrap.AssemblyException;
import org.apache.zest.bootstrap.ModuleAssembly;
import org.apache.zest.library.scheduler.bootstrap.SchedulerAssembler;
+import org.apache.zest.library.scheduler.schedule.Schedule;
import org.apache.zest.library.scheduler.timeline.Timeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertThat;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.zest.functional.Iterables.count;
import static org.apache.zest.library.scheduler.Constants.BAR;
import static org.apache.zest.library.scheduler.Constants.BAZAR;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
public class SchedulerTest
extends AbstractSchedulerTest
@@ -52,8 +54,12 @@ public class SchedulerTest
protected void onAssembly( ModuleAssembly testAssembly )
throws AssemblyException
{
+ @SuppressWarnings( "UnnecessaryLocalVariable" )
ModuleAssembly moduleAssembly = testAssembly;
+
+ @SuppressWarnings( "UnnecessaryLocalVariable" )
ModuleAssembly configModuleAssembly = testAssembly;
+
// START SNIPPET: assembly
new SchedulerAssembler().visibleIn( Visibility.application )
.withConfig( configModuleAssembly, Visibility.layer )
@@ -78,6 +84,7 @@ public class SchedulerTest
try( UnitOfWork uow = module.newUnitOfWork( usecase ) )
{
FooTask task = uow.get( FooTask.class, taskId );
+ assertThat( task.runCounter().get(), equalTo( 1 ) );
assertThat( task.output().get(), equalTo( BAR ) );
}
}
@@ -98,7 +105,7 @@ public class SchedulerTest
taskIdentity = task.identity().get();
DateTime expectedRun = start.withMillisOfSecond( 0 ).withSecondOfMinute( 0 ).plusMinutes( 1 );
- scheduler.scheduleCron( task, "@minutely", true );
+ scheduler.scheduleCron( task, "@minutely" );
uow.complete();
@@ -106,10 +113,11 @@ public class SchedulerTest
LOGGER.info( "Task scheduled on {} to be run at {}", start.getMillis(), expectedRun.getMillis() );
}
- await( usecase.name() ).
- atMost( sleepMillis + 5000, MILLISECONDS ).
- until( taskOutput( taskIdentity ), equalTo( BAR ) );
+ await( usecase.name() )
+ .atMost( sleepMillis + 5000, MILLISECONDS )
+ .until( taskOutput( taskIdentity ), equalTo( 1 ) );
+ //noinspection unused
try( UnitOfWork uow = module.newUnitOfWork( usecase ) )
{
Timeline timeline = module.findService( Timeline.class ).get();
@@ -135,38 +143,65 @@ public class SchedulerTest
@Test
public void testOnce()
- throws UnitOfWorkCompletionException
+ throws UnitOfWorkCompletionException, InterruptedException
{
+ System.setProperty( "zest.entity.print.state", Boolean.TRUE.toString() );
final Usecase usecase = UsecaseBuilder.newUsecase( "TestOnce" );
final String taskIdentity;
+ Scheduler scheduler = module.findService( Scheduler.class ).get();
+
+ Schedule schedule1;
+ Schedule schedule2;
+ Schedule schedule3;
+ Schedule schedule4;
try( UnitOfWork uow = module.newUnitOfWork( usecase ) )
{
- Scheduler scheduler = module.findService( Scheduler.class ).get();
-
FooTask task = createFooTask( uow, usecase.name(), BAZAR );
taskIdentity = task.identity().get();
- scheduler.scheduleOnce( task, 2, true );
+ schedule1 = scheduler.scheduleOnce( task, 1 );
+ schedule2 = scheduler.scheduleOnce( task, 2 );
+ schedule3 = scheduler.scheduleOnce( task, 3 );
+ schedule4 = scheduler.scheduleOnce( task, 4 );
uow.complete();
}
+ Thread.sleep(5000);
+ await( usecase.name() )
+ .atMost( 30, SECONDS )
+ .until( taskOutput( taskIdentity ), equalTo( 4 ) );
- await( usecase.name() ).until( taskOutput( taskIdentity ), equalTo( BAR ) );
+ try( UnitOfWork uow = module.newUnitOfWork( usecase ) )
+ {
+ schedule1 = uow.get( schedule1 );
+ schedule2 = uow.get( schedule2 );
+ schedule3 = uow.get( schedule3 );
+ schedule4 = uow.get( schedule4 );
+ assertThat(schedule1.cancelled().get(), equalTo( false ));
+ assertThat(schedule2.cancelled().get(), equalTo( false ));
+ assertThat(schedule3.cancelled().get(), equalTo( false ));
+ assertThat(schedule4.cancelled().get(), equalTo( false ));
+ assertThat(schedule1.done().get(), equalTo( true ));
+ assertThat(schedule2.done().get(), equalTo( true ));
+ assertThat(schedule3.done().get(), equalTo( true ));
+ assertThat(schedule4.done().get(), equalTo( true ));
+ assertThat(schedule1.running().get(), equalTo( false ));
+ assertThat(schedule2.running().get(), equalTo( false ));
+ assertThat(schedule3.running().get(), equalTo( false ));
+ assertThat(schedule4.running().get(), equalTo( false ));
+ }
}
- private Callable<String> taskOutput( final String taskIdentity )
+ private Callable<Integer> taskOutput( final String taskIdentity )
{
- return new Callable<String>()
- {
- @Override
- public String call()
- throws Exception
+ return () -> {
+ try( UnitOfWork uow = module.newUnitOfWork() )
{
- try( UnitOfWork uow = module.newUnitOfWork() )
- {
- FooTask task = uow.get( FooTask.class, taskIdentity );
- return task == null ? null : task.output().get();
- }
+ FooTask task = uow.get( FooTask.class, taskIdentity );
+ Integer count = task.runCounter().get();
+ System.out.println("Count reached: " + count);
+ uow.discard();
+ return count;
}
};
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
index d0774f5..6de65b7 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/docsupport/SchedulerDocs.java
@@ -41,7 +41,7 @@ public class SchedulerDocs
public void method()
{
MyTaskEntity myTask = todo();
- Schedule schedule = scheduler.scheduleOnce( myTask, 10, false );
+ Schedule schedule = scheduler.scheduleOnce( myTask, 10 );
// myTask will be run in 10 seconds from now
}
[5/6] zest-java git commit: ZEST-128 - Cleaning up some println()
Posted by ni...@apache.org.
ZEST-128 - Cleaning up some println()
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/5d51f37a
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/5d51f37a
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/5d51f37a
Branch: refs/heads/develop
Commit: 5d51f37ac6c68138c40d0329a1201b6a80a81960
Parents: 8214c84
Author: Niclas Hedhman <ni...@hedhman.org>
Authored: Sat Nov 14 11:20:42 2015 +0800
Committer: Niclas Hedhman <ni...@hedhman.org>
Committed: Sat Nov 14 11:20:42 2015 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/zest/library/scheduler/Execution.java | 1 -
.../test/java/org/apache/zest/library/scheduler/SchedulerTest.java | 1 -
2 files changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zest-java/blob/5d51f37a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
index dbb3b72..8aecbe5 100644
--- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
+++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
@@ -113,7 +113,6 @@ public interface Execution
catch( InterruptedException e )
{
// Ignore. Used to signal "Hey, wake up. Time to work..."
- System.out.println("Interrupted");
}
}
}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/5d51f37a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
index b74279c..7bc8aa5 100644
--- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
+++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/SchedulerTest.java
@@ -199,7 +199,6 @@ public class SchedulerTest
{
FooTask task = uow.get( FooTask.class, taskIdentity );
Integer count = task.runCounter().get();
- System.out.println("Count reached: " + count);
uow.discard();
return count;
}