You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2016/12/20 22:23:34 UTC
incubator-batchee git commit: BATCHEE-113 improve CDI scope for Jobs
and Steps
Repository: incubator-batchee
Updated Branches:
refs/heads/master 8268e3d6a -> ce60c30e8
BATCHEE-113 improve CDI scope for Jobs and Steps
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/ce60c30e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/ce60c30e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/ce60c30e
Branch: refs/heads/master
Commit: ce60c30e89d92f756d1c0eaf6e26fa0d5e393e14
Parents: 8268e3d
Author: Mark Struberg <st...@apache.org>
Authored: Tue Dec 20 23:22:48 2016 +0100
Committer: Mark Struberg <st...@apache.org>
Committed: Tue Dec 20 23:22:48 2016 +0100
----------------------------------------------------------------------
.../apache/batchee/cdi/impl/BaseContext.java | 45 ++++++-------
.../batchee/cdi/impl/BatchEEScopeExtension.java | 24 ++++---
.../apache/batchee/cdi/impl/JobContextImpl.java | 53 ++++++++-------
.../apache/batchee/cdi/impl/LocationHolder.java | 71 --------------------
.../batchee/cdi/impl/StepContextImpl.java | 50 ++++----------
.../cdi/listener/AfterJobScopeListener.java | 11 +--
.../cdi/listener/AfterStepScopeListener.java | 13 ++--
.../cdi/listener/BeforeJobScopeListener.java | 11 +--
.../cdi/listener/BeforeStepScopeListener.java | 16 +++--
9 files changed, 109 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
index e76ca88..82abffc 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
@@ -24,13 +24,16 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public abstract class BaseContext<K> implements Context {
- private final ConcurrentMap<K, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<K, ConcurrentMap<Contextual<?>, Instance<?>>>();
+public abstract class BaseContext implements Context {
+ /**
+ * key == either the stepExecutionId or the jobExecutionId
+ */
+ private ConcurrentMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>>();
/**
* @return current keys (we inherit contexts here) sorted by order (the last is the most specific)
*/
- protected abstract K[] currentKeys();
+ protected abstract Long currentKey();
@Override
public <T> T get(final Contextual<T> component, final CreationalContext<T> creationalContext) {
@@ -56,45 +59,35 @@ public abstract class BaseContext<K> implements Context {
public <T> T get(final Contextual<T> component) {
checkActive();
- for (final K key : currentKeys()) {
- final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
- if (storage != null) {
- final Instance<?> instance = storage.get(component);
- if (instance == null) {
- return null;
- }
- return (T) instance.value;
+ final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(currentKey());
+ if (storage != null) {
+ final Instance<?> instance = storage.get(component);
+ if (instance == null) {
+ return null;
}
+ return (T) instance.value;
}
return null;
}
@Override
public boolean isActive() {
- final K[] ks = currentKeys();
- return ks != null && ks.length != 0;
+ return currentKey() != null;
}
- public void endContext() {
- final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(lastKey());
+ public void endContext(Long key) {
+ final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(key);
if (storage == null) {
return;
}
for (final Map.Entry<Contextual<?>, Instance<?>> entry : storage.entrySet()) {
final Instance<?> instance = entry.getValue();
- Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.context);
+ Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.cc);
}
storage.clear();
}
- private K lastKey() {
- final K[] keys = currentKeys();
- if (keys == null || keys.length == 0) {
- return null;
- }
- return keys[keys.length - 1];
- }
private void checkActive() {
if (!isActive()) {
@@ -103,7 +96,7 @@ public abstract class BaseContext<K> implements Context {
}
private ConcurrentMap<Contextual<?>, Instance<?>> getOrCreateCurrentStorage() {
- final K key = lastKey();
+ final Long key = currentKey();
ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
if (storage == null) {
@@ -118,11 +111,11 @@ public abstract class BaseContext<K> implements Context {
private static class Instance<T> {
private final T value;
- private final CreationalContext<T> context;
+ private final CreationalContext<T> cc;
private Instance(final T value, final CreationalContext<T> context) {
this.value = value;
- this.context = context;
+ this.cc = context;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
index ba63756..e872b9a 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
@@ -20,17 +20,25 @@ import javax.enterprise.event.Observes;
import javax.enterprise.inject.spi.AfterBeanDiscovery;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.Extension;
-import javax.enterprise.inject.spi.ProcessAnnotatedType;
public class BatchEEScopeExtension implements Extension {
- void vetoInternalBeans(final @Observes ProcessAnnotatedType<?> pat) {
- if (pat.getAnnotatedType().getJavaClass().getName().startsWith(BatchEEScopeExtension.class.getPackage().getName())) {
- pat.veto();
- }
- }
+
+ private JobContextImpl jobContext;
+ private StepContextImpl stepContext;
void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) {
- afterBeanDiscovery.addContext(JobContextImpl.INSTANCE);
- afterBeanDiscovery.addContext(StepContextImpl.INSTANCE);
+ jobContext = new JobContextImpl();
+ stepContext = new StepContextImpl();
+
+ afterBeanDiscovery.addContext(jobContext);
+ afterBeanDiscovery.addContext(stepContext);
+ }
+
+ public JobContextImpl getJobContext() {
+ return jobContext;
+ }
+
+ public StepContextImpl getStepContext() {
+ return stepContext;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
index aab48e3..910e00c 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
@@ -19,15 +19,15 @@ package org.apache.batchee.cdi.impl;
import org.apache.batchee.cdi.scope.JobScoped;
import java.lang.annotation.Annotation;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.batchee.cdi.impl.LocationHolder.currentJob;
-public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
- public static final BaseContext<?> INSTANCE = new JobContextImpl();
+public class JobContextImpl extends BaseContext {
- private JobContextImpl() {
- // no-op
- }
+ private ConcurrentMap<Long, AtomicInteger> jobReferences = new ConcurrentHashMap<Long, AtomicInteger>();
+ private ThreadLocal<Long> currentJobExecutionId = new ThreadLocal<Long>();
@Override
public Class<? extends Annotation> getScope() {
@@ -35,31 +35,36 @@ public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
}
@Override
- protected JobKey[] currentKeys() {
- return new JobKey[] { new JobKey(currentJob().getExecutionId()) };
+ protected Long currentKey() {
+ return currentJobExecutionId.get();
}
- public static class JobKey {
- private final long executionId;
-
- private final int hashCode;
-
- public JobKey(final long executionId) {
- this.executionId = executionId;
- hashCode = (int) (executionId ^ (executionId >>> 32));
+ public void enterJobExecution(Long jobExecutionId) {
+ AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
+ if (jobRefs == null) {
+ jobRefs = new AtomicInteger(0);
+ AtomicInteger oldJobRefs = jobReferences.putIfAbsent(jobExecutionId, jobRefs);
+ if (oldJobRefs != null) {
+ jobRefs = oldJobRefs;
+ }
}
+ jobRefs.incrementAndGet();
- @Override
- public boolean equals(final Object o) {
- return this == o
- || (!(o == null || getClass() != o.getClass()) && executionId == JobKey.class.cast(o).executionId);
+ currentJobExecutionId.set(jobExecutionId);
+ }
+ public void exitJobExecution() {
+ Long jobExecutionId = currentJobExecutionId.get();
+ AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
+ if (jobRefs != null) {
+ int references = jobRefs.decrementAndGet();
+ if (references == 0) {
+ endContext(jobExecutionId);
+ }
}
- @Override
- public int hashCode() {
- return hashCode;
- }
+ currentJobExecutionId.set(null);
+ currentJobExecutionId.remove();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
deleted file mode 100644
index 92285dc..0000000
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.cdi.impl;
-
-import javax.batch.runtime.context.JobContext;
-import javax.batch.runtime.context.StepContext;
-import java.util.LinkedList;
-
-public abstract class LocationHolder {
- private static final StashThreadLocal<JobContext> JOB = new StashThreadLocal<JobContext>();
- private static final StashThreadLocal<StepContext> STEP = new StashThreadLocal<StepContext>();
-
- protected static void enterJob(final JobContext jc) {
- JOB.get().add(jc);
- }
-
- protected static void enterStep(final StepContext sc) {
- STEP.get().add(sc);
- }
-
- protected static void exitStep(final BaseContext<?> context) {
- cleanUp(context, STEP);
- }
-
- protected static void exitJob(final BaseContext<?> context) {
- cleanUp(context, JOB);
- }
-
- public static JobContext currentJob() {
- final LinkedList<JobContext> jobContexts = JOB.get();
- if (jobContexts.isEmpty()) {
- throw new IllegalStateException("No job registered, did you set the job listener?");
- }
- return jobContexts.getLast();
- }
-
- public static LinkedList<StepContext> currentSteps() {
- return STEP.get();
- }
-
- private static <T, K> void cleanUp(final BaseContext<K> context, final StashThreadLocal<T> stash) {
- context.endContext();
-
- final LinkedList<T> stepContexts = stash.get();
- stepContexts.removeLast();
- if (stepContexts.isEmpty()) {
- stash.remove();
- }
- }
-
- private static class StashThreadLocal<T> extends ThreadLocal<LinkedList<T>> {
- @Override
- public LinkedList<T> initialValue() {
- return new LinkedList<T>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
index 3c85d55..6e97c00 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
@@ -18,18 +18,11 @@ package org.apache.batchee.cdi.impl;
import org.apache.batchee.cdi.scope.StepScoped;
-import javax.batch.runtime.context.StepContext;
import java.lang.annotation.Annotation;
-import java.util.List;
-import static org.apache.batchee.cdi.impl.LocationHolder.currentSteps;
+public class StepContextImpl extends BaseContext {
-public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
- public static final BaseContext<?> INSTANCE = new StepContextImpl();
-
- private StepContextImpl() {
- // no-op
- }
+ private ThreadLocal<Long> currentStepContext = new ThreadLocal<Long>();
@Override
public Class<? extends Annotation> getScope() {
@@ -37,38 +30,21 @@ public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
}
@Override
- protected StepKey[] currentKeys() {
- final List<StepContext> stepContexts = currentSteps();
- final StepKey[] keys = new StepKey[stepContexts.size()];
-
- int i = 0;
- for (final StepContext stepContext : stepContexts) {
- keys[i++] = new StepKey(stepContext.getStepExecutionId());
- }
- return keys;
+ protected Long currentKey() {
+ return currentStepContext.get();
}
- public static class StepKey {
- private final long stepExecutionId;
-
- private final int hashCode;
-
- public StepKey(final long stepExecutionId) {
- this.stepExecutionId = stepExecutionId;
+ public void enterStep(final Long stepContextId) {
+ currentStepContext.set(stepContextId);
+ }
- hashCode = (int) (stepExecutionId ^ (stepExecutionId >>> 32));
- }
+ public void exitStep() {
+ Long stepContextId = currentKey();
+ endContext(stepContextId);
+ currentStepContext.set(null);
+ currentStepContext.remove();
+ }
- @Override
- public boolean equals(final Object o) {
- return this == o
- || (!(o == null || getClass() != o.getClass()) && stepExecutionId == StepKey.class.cast(o).stepExecutionId);
- }
- @Override
- public int hashCode() {
- return hashCode;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
index 041a0e7..10151a5 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
@@ -16,14 +16,17 @@
*/
package org.apache.batchee.cdi.listener;
-import org.apache.batchee.cdi.impl.JobContextImpl;
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
import javax.batch.api.listener.JobListener;
+import javax.inject.Inject;
import javax.inject.Named;
@Named
-public class AfterJobScopeListener extends LocationHolder implements JobListener {
+public class AfterJobScopeListener implements JobListener {
+
+ private @Inject BatchEEScopeExtension scopeExtension;
+
@Override
public void beforeJob() throws Exception {
// no-op
@@ -31,6 +34,6 @@ public class AfterJobScopeListener extends LocationHolder implements JobListener
@Override
public void afterJob() throws Exception {
- exitJob(JobContextImpl.INSTANCE);
+ scopeExtension.getJobContext().exitJobExecution();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
index aa1fb21..bcaaacb 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
@@ -16,14 +16,19 @@
*/
package org.apache.batchee.cdi.listener;
-import org.apache.batchee.cdi.impl.LocationHolder;
-import org.apache.batchee.cdi.impl.StepContextImpl;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
import javax.batch.api.listener.StepListener;
+import javax.enterprise.context.Dependent;
+import javax.inject.Inject;
import javax.inject.Named;
@Named
-public class AfterStepScopeListener extends LocationHolder implements StepListener {
+@Dependent
+public class AfterStepScopeListener implements StepListener {
+
+ private @Inject BatchEEScopeExtension scopeExtension;
+
@Override
public void beforeStep() throws Exception {
// no-op
@@ -31,6 +36,6 @@ public class AfterStepScopeListener extends LocationHolder implements StepListen
@Override
public void afterStep() throws Exception {
- exitStep(StepContextImpl.INSTANCE);
+ scopeExtension.getStepContext().exitStep();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
index 4b54729..3f09894 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
@@ -16,7 +16,7 @@
*/
package org.apache.batchee.cdi.listener;
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
import javax.batch.api.listener.JobListener;
import javax.batch.runtime.context.JobContext;
@@ -24,13 +24,14 @@ import javax.inject.Inject;
import javax.inject.Named;
@Named
-public class BeforeJobScopeListener extends LocationHolder implements JobListener {
- @Inject
- private JobContext jobContext;
+public class BeforeJobScopeListener implements JobListener {
+
+ private @Inject JobContext jobContext;
+ private @Inject BatchEEScopeExtension scopeExtension;
@Override
public void beforeJob() throws Exception {
- enterJob(jobContext);
+ scopeExtension.getJobContext().enterJobExecution(jobContext.getExecutionId());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
index fefc074..6ffc75c 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
@@ -16,25 +16,29 @@
*/
package org.apache.batchee.cdi.listener;
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
import javax.batch.api.listener.StepListener;
import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
@Named
-public class BeforeStepScopeListener extends LocationHolder implements StepListener {
- @Inject
- private StepContext stepContext;
+@Dependent
+public class BeforeStepScopeListener implements StepListener {
+
+ private @Inject StepContext stepContext;
+ private @Inject BatchEEScopeExtension scopeExtension;
+
@Override
public void beforeStep() throws Exception {
- enterStep(stepContext);
+ scopeExtension.getStepContext().enterStep(stepContext.getStepExecutionId());
}
@Override
public void afterStep() throws Exception {
- // no-op
+ // no op
}
}