You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2016/12/20 22:23:34 UTC

incubator-batchee git commit: BATCHEE-113 improve CDI scope for Jobs and Steps

Repository: incubator-batchee
Updated Branches:
  refs/heads/master 8268e3d6a -> ce60c30e8


BATCHEE-113 improve CDI scope for Jobs and Steps


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/ce60c30e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/ce60c30e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/ce60c30e

Branch: refs/heads/master
Commit: ce60c30e89d92f756d1c0eaf6e26fa0d5e393e14
Parents: 8268e3d
Author: Mark Struberg <st...@apache.org>
Authored: Tue Dec 20 23:22:48 2016 +0100
Committer: Mark Struberg <st...@apache.org>
Committed: Tue Dec 20 23:22:48 2016 +0100

----------------------------------------------------------------------
 .../apache/batchee/cdi/impl/BaseContext.java    | 45 ++++++-------
 .../batchee/cdi/impl/BatchEEScopeExtension.java | 24 ++++---
 .../apache/batchee/cdi/impl/JobContextImpl.java | 53 ++++++++-------
 .../apache/batchee/cdi/impl/LocationHolder.java | 71 --------------------
 .../batchee/cdi/impl/StepContextImpl.java       | 50 ++++----------
 .../cdi/listener/AfterJobScopeListener.java     | 11 +--
 .../cdi/listener/AfterStepScopeListener.java    | 13 ++--
 .../cdi/listener/BeforeJobScopeListener.java    | 11 +--
 .../cdi/listener/BeforeStepScopeListener.java   | 16 +++--
 9 files changed, 109 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
index e76ca88..82abffc 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
@@ -24,13 +24,16 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-public abstract class BaseContext<K> implements Context {
-    private final ConcurrentMap<K, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<K, ConcurrentMap<Contextual<?>, Instance<?>>>();
+public abstract class BaseContext implements Context {
+    /**
+     * key == either the stepExecutionId or the jobExecutionId
+     */
+    private ConcurrentMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>>();
 
     /**
      * @return current keys (we inherit contexts here) sorted by order (the last is the most specific)
      */
-    protected abstract K[] currentKeys();
+    protected abstract Long currentKey();
 
     @Override
     public <T> T get(final Contextual<T> component, final CreationalContext<T> creationalContext) {
@@ -56,45 +59,35 @@ public abstract class BaseContext<K> implements Context {
     public <T> T get(final Contextual<T> component) {
         checkActive();
 
-        for (final K key : currentKeys()) {
-            final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
-            if (storage != null) {
-                final Instance<?> instance = storage.get(component);
-                if (instance == null) {
-                    return null;
-                }
-                return (T) instance.value;
+        final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(currentKey());
+        if (storage != null) {
+            final Instance<?> instance = storage.get(component);
+            if (instance == null) {
+                return null;
             }
+            return (T) instance.value;
         }
         return null;
     }
 
     @Override
     public boolean isActive() {
-        final K[] ks = currentKeys();
-        return ks != null && ks.length != 0;
+        return currentKey() != null;
     }
 
-    public void endContext() {
-        final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(lastKey());
+    public void endContext(Long key) {
+        final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(key);
         if (storage == null) {
             return;
         }
 
         for (final Map.Entry<Contextual<?>, Instance<?>> entry : storage.entrySet()) {
             final Instance<?> instance = entry.getValue();
-            Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.context);
+            Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.cc);
         }
         storage.clear();
     }
 
-    private K lastKey() {
-        final K[] keys = currentKeys();
-        if (keys == null || keys.length == 0) {
-            return null;
-        }
-        return keys[keys.length - 1];
-    }
 
     private void checkActive() {
         if (!isActive()) {
@@ -103,7 +96,7 @@ public abstract class BaseContext<K> implements Context {
     }
 
     private ConcurrentMap<Contextual<?>, Instance<?>> getOrCreateCurrentStorage() {
-        final K key = lastKey();
+        final Long key = currentKey();
 
         ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
         if (storage == null) {
@@ -118,11 +111,11 @@ public abstract class BaseContext<K> implements Context {
 
     private static class Instance<T> {
         private final T value;
-        private final CreationalContext<T> context;
+        private final CreationalContext<T> cc;
 
         private Instance(final T value, final CreationalContext<T> context) {
             this.value = value;
-            this.context = context;
+            this.cc = context;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
index ba63756..e872b9a 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
@@ -20,17 +20,25 @@ import javax.enterprise.event.Observes;
 import javax.enterprise.inject.spi.AfterBeanDiscovery;
 import javax.enterprise.inject.spi.BeanManager;
 import javax.enterprise.inject.spi.Extension;
-import javax.enterprise.inject.spi.ProcessAnnotatedType;
 
 public class BatchEEScopeExtension implements Extension {
-    void vetoInternalBeans(final @Observes ProcessAnnotatedType<?> pat) {
-        if (pat.getAnnotatedType().getJavaClass().getName().startsWith(BatchEEScopeExtension.class.getPackage().getName())) {
-            pat.veto();
-        }
-    }
+
+    private JobContextImpl jobContext;
+    private StepContextImpl stepContext;
 
     void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) {
-        afterBeanDiscovery.addContext(JobContextImpl.INSTANCE);
-        afterBeanDiscovery.addContext(StepContextImpl.INSTANCE);
+        jobContext = new JobContextImpl();
+        stepContext = new StepContextImpl();
+
+        afterBeanDiscovery.addContext(jobContext);
+        afterBeanDiscovery.addContext(stepContext);
+    }
+
+    public JobContextImpl getJobContext() {
+        return jobContext;
+    }
+
+    public StepContextImpl getStepContext() {
+        return stepContext;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
index aab48e3..910e00c 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
@@ -19,15 +19,15 @@ package org.apache.batchee.cdi.impl;
 import org.apache.batchee.cdi.scope.JobScoped;
 
 import java.lang.annotation.Annotation;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.batchee.cdi.impl.LocationHolder.currentJob;
 
-public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
-    public static final BaseContext<?> INSTANCE = new JobContextImpl();
+public class JobContextImpl extends BaseContext {
 
-    private JobContextImpl() {
-        // no-op
-    }
+    private ConcurrentMap<Long, AtomicInteger> jobReferences = new ConcurrentHashMap<Long, AtomicInteger>();
+    private ThreadLocal<Long> currentJobExecutionId = new ThreadLocal<Long>();
 
     @Override
     public Class<? extends Annotation> getScope() {
@@ -35,31 +35,36 @@ public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
     }
 
     @Override
-    protected JobKey[] currentKeys() {
-        return new JobKey[] { new JobKey(currentJob().getExecutionId()) };
+    protected Long currentKey() {
+        return currentJobExecutionId.get();
     }
 
-    public static class JobKey {
-        private final long executionId;
-
-        private final int hashCode;
-
-        public JobKey(final long executionId) {
-            this.executionId = executionId;
 
-            hashCode = (int) (executionId ^ (executionId >>> 32));
+    public void enterJobExecution(Long jobExecutionId) {
+        AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
+        if (jobRefs == null) {
+            jobRefs = new AtomicInteger(0);
+            AtomicInteger oldJobRefs = jobReferences.putIfAbsent(jobExecutionId, jobRefs);
+            if (oldJobRefs != null) {
+                jobRefs = oldJobRefs;
+            }
         }
+        jobRefs.incrementAndGet();
 
-        @Override
-        public boolean equals(final Object o) {
-            return this == o
-                || (!(o == null || getClass() != o.getClass()) && executionId == JobKey.class.cast(o).executionId);
+        currentJobExecutionId.set(jobExecutionId);
+    }
 
+    public void exitJobExecution() {
+        Long jobExecutionId = currentJobExecutionId.get();
+        AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
+        if (jobRefs != null) {
+            int references = jobRefs.decrementAndGet();
+            if (references == 0) {
+                endContext(jobExecutionId);
+            }
         }
 
-        @Override
-        public int hashCode() {
-            return hashCode;
-        }
+        currentJobExecutionId.set(null);
+        currentJobExecutionId.remove();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
deleted file mode 100644
index 92285dc..0000000
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.cdi.impl;
-
-import javax.batch.runtime.context.JobContext;
-import javax.batch.runtime.context.StepContext;
-import java.util.LinkedList;
-
-public abstract class LocationHolder {
-    private static final StashThreadLocal<JobContext> JOB = new StashThreadLocal<JobContext>();
-    private static final StashThreadLocal<StepContext> STEP = new StashThreadLocal<StepContext>();
-
-    protected static void enterJob(final JobContext jc) {
-        JOB.get().add(jc);
-    }
-
-    protected static void enterStep(final StepContext sc) {
-        STEP.get().add(sc);
-    }
-
-    protected static void exitStep(final BaseContext<?> context) {
-        cleanUp(context, STEP);
-    }
-
-    protected static void exitJob(final BaseContext<?> context) {
-        cleanUp(context, JOB);
-    }
-
-    public static JobContext currentJob() {
-        final LinkedList<JobContext> jobContexts = JOB.get();
-        if (jobContexts.isEmpty()) {
-            throw new IllegalStateException("No job registered, did you set the job listener?");
-        }
-        return jobContexts.getLast();
-    }
-
-    public static LinkedList<StepContext> currentSteps() {
-        return STEP.get();
-    }
-
-    private static <T, K> void cleanUp(final BaseContext<K> context, final StashThreadLocal<T> stash) {
-        context.endContext();
-
-        final LinkedList<T> stepContexts = stash.get();
-        stepContexts.removeLast();
-        if (stepContexts.isEmpty()) {
-            stash.remove();
-        }
-    }
-
-    private static class StashThreadLocal<T> extends ThreadLocal<LinkedList<T>> {
-        @Override
-        public LinkedList<T> initialValue() {
-            return new LinkedList<T>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
index 3c85d55..6e97c00 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
@@ -18,18 +18,11 @@ package org.apache.batchee.cdi.impl;
 
 import org.apache.batchee.cdi.scope.StepScoped;
 
-import javax.batch.runtime.context.StepContext;
 import java.lang.annotation.Annotation;
-import java.util.List;
 
-import static org.apache.batchee.cdi.impl.LocationHolder.currentSteps;
+public class StepContextImpl extends BaseContext {
 
-public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
-    public static final BaseContext<?> INSTANCE = new StepContextImpl();
-
-    private StepContextImpl() {
-        // no-op
-    }
+    private ThreadLocal<Long> currentStepContext = new ThreadLocal<Long>();
 
     @Override
     public Class<? extends Annotation> getScope() {
@@ -37,38 +30,21 @@ public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
     }
 
     @Override
-    protected StepKey[] currentKeys() {
-        final List<StepContext> stepContexts = currentSteps();
-        final StepKey[] keys = new StepKey[stepContexts.size()];
-
-        int i = 0;
-        for (final StepContext stepContext : stepContexts) {
-            keys[i++] = new StepKey(stepContext.getStepExecutionId());
-        }
-        return keys;
+    protected Long currentKey() {
+        return currentStepContext.get();
     }
 
-    public static class StepKey {
-        private final long stepExecutionId;
-
-        private final int hashCode;
-
-        public StepKey(final long stepExecutionId) {
-            this.stepExecutionId = stepExecutionId;
+    public void enterStep(final Long stepContextId) {
+        currentStepContext.set(stepContextId);
+    }
 
-            hashCode = (int) (stepExecutionId ^ (stepExecutionId >>> 32));
-        }
+    public void exitStep() {
+        Long stepContextId = currentKey();
+        endContext(stepContextId);
+        currentStepContext.set(null);
+        currentStepContext.remove();
+    }
 
-        @Override
-        public boolean equals(final Object o) {
-            return this == o
-                || (!(o == null || getClass() != o.getClass()) && stepExecutionId == StepKey.class.cast(o).stepExecutionId);
 
-        }
 
-        @Override
-        public int hashCode() {
-            return hashCode;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
index 041a0e7..10151a5 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
@@ -16,14 +16,17 @@
  */
 package org.apache.batchee.cdi.listener;
 
-import org.apache.batchee.cdi.impl.JobContextImpl;
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
 
 import javax.batch.api.listener.JobListener;
+import javax.inject.Inject;
 import javax.inject.Named;
 
 @Named
-public class AfterJobScopeListener extends LocationHolder implements JobListener {
+public class AfterJobScopeListener implements JobListener {
+
+    private @Inject BatchEEScopeExtension scopeExtension;
+
     @Override
     public void beforeJob() throws Exception {
         // no-op
@@ -31,6 +34,6 @@ public class AfterJobScopeListener extends LocationHolder implements JobListener
 
     @Override
     public void afterJob() throws Exception {
-        exitJob(JobContextImpl.INSTANCE);
+        scopeExtension.getJobContext().exitJobExecution();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
index aa1fb21..bcaaacb 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
@@ -16,14 +16,19 @@
  */
 package org.apache.batchee.cdi.listener;
 
-import org.apache.batchee.cdi.impl.LocationHolder;
-import org.apache.batchee.cdi.impl.StepContextImpl;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
 
 import javax.batch.api.listener.StepListener;
+import javax.enterprise.context.Dependent;
+import javax.inject.Inject;
 import javax.inject.Named;
 
 @Named
-public class AfterStepScopeListener extends LocationHolder implements StepListener {
+@Dependent
+public class AfterStepScopeListener implements StepListener {
+
+    private @Inject BatchEEScopeExtension scopeExtension;
+
     @Override
     public void beforeStep() throws Exception {
         // no-op
@@ -31,6 +36,6 @@ public class AfterStepScopeListener extends LocationHolder implements StepListen
 
     @Override
     public void afterStep() throws Exception {
-        exitStep(StepContextImpl.INSTANCE);
+        scopeExtension.getStepContext().exitStep();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
index 4b54729..3f09894 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
@@ -16,7 +16,7 @@
  */
 package org.apache.batchee.cdi.listener;
 
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
 
 import javax.batch.api.listener.JobListener;
 import javax.batch.runtime.context.JobContext;
@@ -24,13 +24,14 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 @Named
-public class BeforeJobScopeListener extends LocationHolder implements JobListener {
-    @Inject
-    private JobContext jobContext;
+public class BeforeJobScopeListener implements JobListener {
+
+    private @Inject JobContext jobContext;
+    private @Inject BatchEEScopeExtension scopeExtension;
 
     @Override
     public void beforeJob() throws Exception {
-        enterJob(jobContext);
+        scopeExtension.getJobContext().enterJobExecution(jobContext.getExecutionId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/ce60c30e/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
index fefc074..6ffc75c 100644
--- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
@@ -16,25 +16,29 @@
  */
 package org.apache.batchee.cdi.listener;
 
-import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.BatchEEScopeExtension;
 
 import javax.batch.api.listener.StepListener;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 import javax.inject.Named;
 
 @Named
-public class BeforeStepScopeListener extends LocationHolder implements StepListener {
-    @Inject
-    private StepContext stepContext;
+@Dependent
+public class BeforeStepScopeListener implements StepListener {
+
+    private @Inject StepContext stepContext;
+    private @Inject BatchEEScopeExtension scopeExtension;
+
 
     @Override
     public void beforeStep() throws Exception {
-        enterStep(stepContext);
+        scopeExtension.getStepContext().enterStep(stepContext.getStepExecutionId());
     }
 
     @Override
     public void afterStep() throws Exception {
-        // no-op
+        // no op
     }
 }