You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/31 06:33:20 UTC
[30/38] ignite git commit: ignite-2560 Support resource injection for
entry processor, optimizations for resource injection.
ignite-2560 Support resource injection for entry processor, optimizations for resource injection.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9ff97c9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9ff97c9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9ff97c9
Branch: refs/heads/ignite-3443
Commit: f9ff97c91374dcd9cd8ad08d46d1d2de44193060
Parents: 407071e
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Aug 30 09:31:20 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Aug 30 09:32:23 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheLazyEntry.java | 2 +
.../EntryProcessorResourceInjectorProxy.java | 105 +++++
.../processors/cache/GridCacheMapEntry.java | 13 +-
.../GridNearAtomicSingleUpdateFuture.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 18 +-
.../transactions/IgniteTxLocalAdapter.java | 5 +-
.../processors/resource/GridResourceIoc.java | 438 +++++++++++++++----
.../resource/GridResourceProcessor.java | 396 +++++++----------
.../cache/GridCacheAbstractFullApiSelfTest.java | 404 +++++++++++++++--
.../cache/GridCacheAbstractSelfTest.java | 140 +++++-
.../GridCacheTransformEventSelfTest.java | 66 ++-
...ePartitionedBasicStoreMultiNodeSelfTest.java | 2 +
.../GridTransformSpringInjectionSelfTest.java | 186 ++++++++
.../testsuites/IgniteSpringTestSuite.java | 7 +-
.../IgniteInvokeWithInjectionBenchmark.java | 74 ++++
.../IgniteInvokeWithInjectionTxBenchmark.java | 30 ++
17 files changed, 1515 insertions(+), 396 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index c8cfc99..02cccc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -191,6 +191,8 @@ public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> {
@Override public <T> T unwrap(Class<T> cls) {
if (cls.isAssignableFrom(Ignite.class))
return (T)cctx.kernalContext().grid();
+ else if (cls.isAssignableFrom(GridCacheContext.class))
+ return (T)cctx;
else if (cls.isAssignableFrom(getClass()))
return cls.cast(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
new file mode 100644
index 0000000..76b2511
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Entry processor wrapper injecting Ignite resources into target processor before execution.
+ */
+public class EntryProcessorResourceInjectorProxy<K, V, T> implements EntryProcessor<K, V, T>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate. */
+ private EntryProcessor<K, V, T> delegate;
+
+ /** Injected flag. */
+ private transient boolean injected;
+
+ /**
+ * @param delegate Delegate.
+ */
+ private EntryProcessorResourceInjectorProxy(EntryProcessor<K, V, T> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException {
+ if (!injected) {
+ GridCacheContext cctx = entry.unwrap(GridCacheContext.class);
+
+ GridResourceProcessor rsrc = cctx.kernalContext().resource();
+
+ try {
+ rsrc.inject(delegate, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, cctx.name());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ injected = true;
+ }
+
+ return delegate.process(entry, arguments);
+ }
+
+ /**
+ * @return Delegate entry processor.
+ */
+ public EntryProcessor<K, V, T> delegate() {
+ return delegate;
+ }
+
+ /**
+ * Wraps EntryProcessor if needed.
+ *
+ * @param ctx Context.
+ * @param proc Entry proc.
+ * @return Wrapped entry proc if wrapping is needed.
+ */
+ public static <K, V, T> EntryProcessor<K, V, T> wrap(GridKernalContext ctx,
+ @Nullable EntryProcessor<K, V, T> proc) {
+ if (proc == null || proc instanceof EntryProcessorResourceInjectorProxy)
+ return proc;
+
+ GridResourceProcessor rsrcProcessor = ctx.resource();
+
+ return rsrcProcessor.isAnnotationsPresent(null, proc, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR) ?
+ new EntryProcessorResourceInjectorProxy<>(proc) : proc;
+ }
+
+ /**
+ * Unwraps EntryProcessor as Object if needed.
+ *
+ * @param obj Entry processor.
+ * @return Unwrapped entry processor.
+ */
+ static Object unwrap(Object obj) {
+ return (obj instanceof EntryProcessorResourceInjectorProxy) ? ((EntryProcessorResourceInjectorProxy)obj).delegate() : obj;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f692bf4..c760ac1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -896,6 +896,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(
partition(),
key,
@@ -1004,7 +1006,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(false);
}
- if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(
partition(),
key,
@@ -1019,6 +1023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
transformClo != null ? transformClo.getClass().getName() : null,
taskName,
keepBinary);
+ }
}
}
@@ -1685,7 +1690,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Calculate new value.
if (op == GridCacheOperation.TRANSFORM) {
- transformCloClsName = writeObj.getClass().getName();
+ transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName();
EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
@@ -2463,6 +2468,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
evtOld = cctx.unwrapTemporary(oldVal);
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(partition(), key, evtNodeId, null,
newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
@@ -2553,6 +2560,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
evtOld = cctx.unwrapTemporary(oldVal);
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(partition(), key, evtNodeId, null,
newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 661a178..256c7ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -26,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -43,13 +51,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -549,6 +550,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2432f63..30a0c3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -800,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
@@ -826,6 +828,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
@@ -940,6 +944,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
ClusterNode primary = cctx.affinity().primary(cacheKey.partition(), topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index ac08f8f..a419887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -432,7 +434,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
needVer);
}
-
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@@ -511,7 +512,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
if (entry != null) {
- CacheObject v ;
+ CacheObject v;
GridCacheVersion ver;
if (needVer) {
@@ -541,7 +542,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
deserializeBinary,
true,
ver);
- }else
+ }
+ else
success = false;
}
else {
@@ -944,6 +946,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
if (op == UPDATE)
val = ctx.toCacheObject(val);
+ else if (op == TRANSFORM)
+ ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name());
while (true) {
GridCacheEntryEx entry = null;
@@ -1014,7 +1018,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
if (err != null)
throw err;
- Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) :
+ Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) :
(retval || op == TRANSFORM) ? res.get2() : res.get1();
if (op == TRANSFORM && ret == null)
@@ -1035,8 +1039,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
* @param filter Optional filter.
* @param subjId Subject ID.
* @param taskName Task name.
- * @throws CachePartialUpdateCheckedException If update failed.
* @return Results map for invoke operation.
+ * @throws CachePartialUpdateCheckedException If update failed.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
private Map<K, EntryProcessorResult> updateWithBatch(
@@ -1101,6 +1105,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
if (op == TRANSFORM) {
+ ctx.kernalContext().resource().inject(val,
+ GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR,
+ ctx.name());
+
EntryProcessor<Object, Object, Object> entryProcessor =
(EntryProcessor<Object, Object, Object>)val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 9ad7fb0..ee992cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -65,7 +66,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -89,6 +89,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
@@ -3664,7 +3665,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
this,
op,
val,
- entryProcessor,
+ EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor),
invokeArgs,
hasDrTtl ? drTtl : -1L,
entry,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 35824fa..0158973 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -21,12 +21,12 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.util.GridLeanIdentitySet;
@@ -35,6 +35,17 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.CacheNameResource;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoadBalancerResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.ServiceResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.resources.TaskContinuousMapperResource;
+import org.apache.ignite.resources.TaskSessionResource;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -42,17 +53,12 @@ import org.jsr166.ConcurrentHashMap8;
* Resource container contains caches for classes used for injection.
* Caches used to improve the efficiency of standard Java reflection mechanism.
*/
-class GridResourceIoc {
+public class GridResourceIoc {
/** Task class resource mapping. Used to efficiently cleanup resources related to class loader. */
- private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap =
- new ConcurrentHashMap8<>();
+ private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap = new ConcurrentHashMap8<>();
/** Class descriptors cache. */
- private final ConcurrentMap<Class<?>, ClassDescriptor> clsDescs = new ConcurrentHashMap8<>();
-
- /** */
- private final ConcurrentMap<Class<?>, Class<? extends Annotation>[]> annCache =
- new ConcurrentHashMap8<>();
+ private AtomicReference<Map<Class<?>, ClassDescriptor>> clsDescs = new AtomicReference<>();
/**
* @param ldr Class loader.
@@ -61,8 +67,22 @@ class GridResourceIoc {
Set<Class<?>> clss = taskMap.remove(ldr);
if (clss != null) {
- clsDescs.keySet().removeAll(clss);
- annCache.keySet().removeAll(clss);
+ Map<Class<?>, ClassDescriptor> newMap, oldMap;
+
+ do {
+ oldMap = clsDescs.get();
+
+ if (oldMap == null)
+ break;
+
+ newMap = new HashMap<>(oldMap.size() - clss.size());
+
+ for (Map.Entry<Class<?>, ClassDescriptor> entry : oldMap.entrySet()) {
+ if (!clss.contains(entry.getKey()))
+ newMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ while (!clsDescs.compareAndSet(oldMap, newMap));
}
}
@@ -71,8 +91,8 @@ class GridResourceIoc {
*/
void undeployAll() {
taskMap.clear();
- clsDescs.clear();
- annCache.clear();
+
+ clsDescs.set(null);
}
/**
@@ -83,8 +103,8 @@ class GridResourceIoc {
* @param injector Resource to inject.
* @param dep Deployment.
* @param depCls Deployment class.
- * @throws IgniteCheckedException Thrown in case of any errors during injection.
* @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
*/
@SuppressWarnings("SimplifiableIfStatement")
boolean inject(Object target,
@@ -92,26 +112,41 @@ class GridResourceIoc {
GridResourceInjector injector,
@Nullable GridDeployment dep,
@Nullable Class<?> depCls)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
return injectInternal(target, annCls, injector, dep, depCls, null);
}
/**
+ * @param dep Deployment.
* @param cls Class.
+ * @return Descriptor.
*/
- private ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
- ClassDescriptor res = clsDescs.get(cls);
+ ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
+ Map<Class<?>, ClassDescriptor> newMap, oldMap;
+ ClassDescriptor res, newDesc = null;
+
+ do {
+ oldMap = clsDescs.get();
+
+ if (oldMap != null && (res = oldMap.get(cls)) != null)
+ break;
- if (res == null) {
if (dep != null) {
Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
classes.add(cls);
+
+ dep = null;
}
- res = F.addIfAbsent(clsDescs, cls, new ClassDescriptor(cls));
+ if (oldMap == null)
+ newMap = new HashMap<>();
+ else
+ (newMap = new HashMap<>(oldMap.size() + 1)).putAll(oldMap);
+
+ newMap.put(cls, res = newDesc == null ? (newDesc = new ClassDescriptor(cls)) : newDesc);
}
+ while (!clsDescs.compareAndSet(oldMap, newMap));
return res;
}
@@ -123,8 +158,8 @@ class GridResourceIoc {
* @param dep Deployment.
* @param depCls Deployment class.
* @param checkedObjs Set of already inspected objects to avoid indefinite recursion.
- * @throws IgniteCheckedException Thrown in case of any errors during injection.
* @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
*/
private boolean injectInternal(Object target,
Class<? extends Annotation> annCls,
@@ -132,56 +167,14 @@ class GridResourceIoc {
@Nullable GridDeployment dep,
@Nullable Class<?> depCls,
@Nullable Set<Object> checkedObjs)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
Class<?> targetCls = target.getClass();
ClassDescriptor descr = descriptor(dep, targetCls);
T2<GridResourceField[], GridResourceMethod[]> annotatedMembers = descr.annotatedMembers(annCls);
- if (descr.recursiveFields().length == 0 && annotatedMembers == null)
- return false;
-
- if (checkedObjs == null && descr.recursiveFields().length > 0)
- checkedObjs = new GridLeanIdentitySet<>();
-
- if (checkedObjs != null && !checkedObjs.add(target))
- return false;
-
- boolean injected = false;
-
- for (Field field : descr.recursiveFields()) {
- try {
- Object obj = field.get(target);
-
- if (obj != null) {
- assert checkedObjs != null;
-
- injected |= injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
- }
- }
- catch (IllegalAccessException e) {
- throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
- ", target=" + target + ']', e);
- }
- }
-
- if (annotatedMembers != null) {
- for (GridResourceField field : annotatedMembers.get1()) {
- injector.inject(field, target, depCls, dep);
-
- injected = true;
- }
-
- for (GridResourceMethod mtd : annotatedMembers.get2()) {
- injector.inject(mtd, target, depCls, dep);
-
- injected = true;
- }
- }
-
- return injected;
+ return descr.injectInternal(target, annCls, annotatedMembers, injector, dep, depCls, checkedObjs);
}
/**
@@ -202,36 +195,18 @@ class GridResourceIoc {
}
/**
+ * Checks if annotation is presented on a field or method of the specified object.
+ *
+ * @param target Target object.
+ * @param annSet Annotation classes to find on fields or methods of target object.
* @param dep Deployment.
- * @param target Target.
- * @param annClss Annotations.
- * @return Filtered set of annotations that present in target.
+ * @return {@code true} if any annotation is presented, {@code false} if it's not.
*/
- @SuppressWarnings({"SuspiciousToArrayCall", "unchecked"})
- Class<? extends Annotation>[] filter(
- @Nullable GridDeployment dep, Object target,
- Collection<Class<? extends Annotation>> annClss) {
+ boolean isAnnotationsPresent(@Nullable GridDeployment dep, Object target, AnnotationSet annSet) {
assert target != null;
- assert annClss != null && !annClss.isEmpty();
+ assert annSet != null;
- Class<?> cls = target.getClass();
-
- Class<? extends Annotation>[] res = annCache.get(cls);
-
- if (res == null) {
- Collection<Class<? extends Annotation>> res0 = new ArrayList<>();
-
- for (Class<? extends Annotation> annCls : annClss) {
- if (isAnnotationPresent(target, annCls, dep))
- res0.add(annCls);
- }
-
- res = res0.toArray(new Class[res0.size()]);
-
- annCache.putIfAbsent(cls, res);
- }
-
- return res;
+ return descriptor(dep, target.getClass()).isAnnotated(annSet) != 0;
}
/**
@@ -251,16 +226,18 @@ class GridResourceIoc {
return t2 == null ? GridResourceMethod.EMPTY_ARRAY : t2.get2();
}
- /** {@inheritDoc} */
+ /** Print memory statistics */
public void printMemoryStats() {
X.println(">>> taskMapSize: " + taskMap.size());
- X.println(">>> classDescriptorsCacheSize: " + clsDescs.size());
+
+ Map<Class<?>, ClassDescriptor> map = clsDescs.get();
+ X.println(">>> classDescriptorsCacheSize: " + (map == null ? 0 : map.size()));
}
/**
*
*/
- private static class ClassDescriptor {
+ class ClassDescriptor {
/** */
private final Field[] recursiveFields;
@@ -268,8 +245,18 @@ class GridResourceIoc {
private final Map<Class<? extends Annotation>, T2<GridResourceField[], GridResourceMethod[]>> annMap;
/**
+ * Uses as enum-map with enum {@link AnnotationSet} member as key,
+ * and bitmap as a result of matching found annotations with enum set {@link ResourceAnnotation} as value.
+ */
+ private final int[] containsAnnSets;
+
+ /** Uses as enum-map with enum {@link ResourceAnnotation} member as a keys. */
+ private final T2<GridResourceField[], GridResourceMethod[]>[] annArr;
+
+ /**
* @param cls Class.
*/
+ @SuppressWarnings("unchecked")
ClassDescriptor(Class<?> cls) {
Map<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> annMap
= new HashMap<>();
@@ -335,20 +322,277 @@ class GridResourceIoc {
this.annMap.put(entry.getKey(), new T2<>(fields, mtds));
}
+
+ T2<GridResourceField[], GridResourceMethod[]>[] annArr = null;
+
+ if (annMap.isEmpty())
+ containsAnnSets = null;
+ else {
+ int annotationsBits = 0;
+
+ for (ResourceAnnotation ann : ResourceAnnotation.values()) {
+ T2<GridResourceField[], GridResourceMethod[]> member = annotatedMembers(ann.clazz);
+
+ if (member != null) {
+ if (annArr == null)
+ annArr = new T2[ResourceAnnotation.values().length];
+
+ annArr[ann.ordinal()] = member;
+
+ annotationsBits |= 1 << ann.ordinal();
+ }
+ }
+
+ AnnotationSet[] annotationSets = AnnotationSet.values();
+
+ containsAnnSets = new int[annotationSets.length];
+
+ for (int i = 0; i < annotationSets.length; i++)
+ containsAnnSets[i] = annotationsBits & annotationSets[i].annotationsBitSet;
+ }
+
+ this.annArr = annArr;
}
/**
* @return Recursive fields.
*/
- public Field[] recursiveFields() {
+ Field[] recursiveFields() {
return recursiveFields;
}
/**
+ * @param annCls Annotation class.
* @return Fields.
*/
- @Nullable public T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
+ @Nullable T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
return annMap.get(annCls);
}
+
+ /**
+ * @param set annotation set.
+ * @return {@code Bitmask} > 0 if any annotation is presented, otherwise return 0;
+ */
+ int isAnnotated(AnnotationSet set) {
+ return recursiveFields.length > 0 ? set.annotationsBitSet :
+ (containsAnnSets == null ? 0 : containsAnnSets[set.ordinal()]);
+ }
+
+ /**
+ * @param ann Annotation.
+ * @return {@code True} if annotation is presented.
+ */
+ boolean isAnnotated(ResourceAnnotation ann) {
+ return recursiveFields.length > 0 || (annArr != null && annArr[ann.ordinal()] != null);
+ }
+
+ /**
+ * @param target Target object.
+ * @param annCls Annotation class.
+ * @param annotatedMembers Setter annotation.
+ * @param injector Resource to inject.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @param checkedObjs Set of already inspected objects to avoid indefinite recursion.
+ * @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
+ */
+ boolean injectInternal(Object target,
+ Class<? extends Annotation> annCls,
+ T2<GridResourceField[], GridResourceMethod[]> annotatedMembers,
+ GridResourceInjector injector,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls,
+ @Nullable Set<Object> checkedObjs)
+ throws IgniteCheckedException {
+ if (recursiveFields.length == 0 && annotatedMembers == null)
+ return false;
+
+ if (checkedObjs == null && recursiveFields.length > 0)
+ checkedObjs = new GridLeanIdentitySet<>();
+
+ if (checkedObjs != null && !checkedObjs.add(target))
+ return false;
+
+ boolean injected = false;
+
+ for (Field field : recursiveFields) {
+ try {
+ Object obj = field.get(target);
+
+ if (obj != null) {
+ assert checkedObjs != null;
+
+ ClassDescriptor desc = descriptor(dep, obj.getClass());
+ injected |= desc.injectInternal(obj, annCls, desc.annotatedMembers(annCls),
+ injector, dep, depCls, checkedObjs);
+ }
+ }
+ catch (IllegalAccessException e) {
+ throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
+ ", target=" + target + ']', e);
+ }
+ }
+
+ if (annotatedMembers != null) {
+ for (GridResourceField field : annotatedMembers.get1()) {
+ injector.inject(field, target, depCls, dep);
+
+ injected = true;
+ }
+
+ for (GridResourceMethod mtd : annotatedMembers.get2()) {
+ injector.inject(mtd, target, depCls, dep);
+
+ injected = true;
+ }
+ }
+
+ return injected;
+ }
+
+ /**
+ * @param target Target object.
+ * @param ann Setter annotation.
+ * @param injector Resource to inject.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
+ */
+ public boolean inject(Object target,
+ ResourceAnnotation ann,
+ GridResourceInjector injector,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls)
+ throws IgniteCheckedException {
+ return injectInternal(target,
+ ann.clazz,
+ annArr == null ? null : annArr[ann.ordinal()],
+ injector,
+ dep,
+ depCls,
+ null);
+ }
+ }
+
+ /**
+ *
+ */
+ enum ResourceAnnotation {
+ /** */
+ CACHE_NAME(CacheNameResource.class),
+
+ /** */
+ SPRING_APPLICATION_CONTEXT(SpringApplicationContextResource.class),
+
+ /** */
+ SPRING(SpringResource.class),
+
+ /** */
+ IGNITE_INSTANCE(IgniteInstanceResource.class),
+
+ /** */
+ LOGGER(LoggerResource.class),
+
+ /** */
+ SERVICE(ServiceResource.class),
+
+ /** */
+ TASK_SESSION(TaskSessionResource.class),
+
+ /** */
+ LOAD_BALANCER(LoadBalancerResource.class),
+
+ /** */
+ TASK_CONTINUOUS_MAPPER(TaskContinuousMapperResource.class),
+
+ /** */
+ JOB_CONTEXT(JobContextResource.class),
+
+ /** */
+ CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+
+ /** */
+ public final Class<? extends Annotation> clazz;
+
+ /**
+ * @param clazz annotation class.
+ */
+ ResourceAnnotation(Class<? extends Annotation> clazz) {
+ this.clazz = clazz;
+ }
+ }
+
+ /**
+ *
+ */
+ public enum AnnotationSet {
+ /** */
+ GENERIC(
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ ENTRY_PROCESSOR(
+ ResourceAnnotation.CACHE_NAME,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ TASK(
+ ResourceAnnotation.TASK_SESSION,
+ ResourceAnnotation.LOAD_BALANCER,
+ ResourceAnnotation.TASK_CONTINUOUS_MAPPER,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ JOB(
+ ResourceAnnotation.TASK_SESSION,
+ ResourceAnnotation.JOB_CONTEXT,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ );
+
+ /** Resource annotations bits for fast checks. */
+ public final int annotationsBitSet;
+
+ /** Holds annotations in order */
+ public final ResourceAnnotation[] annotations;
+
+ /**
+ * @param annotations ResourceAnnotations.
+ */
+ AnnotationSet(ResourceAnnotation... annotations) {
+ assert annotations.length < 32 : annotations.length;
+
+ this.annotations = annotations;
+
+ int mask = 0;
+
+ for (ResourceAnnotation ann : annotations)
+ mask |= 1 << ann.ordinal();
+
+ annotationsBitSet = mask;
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index afe0ef1..84d07b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -20,12 +20,11 @@ package org.apache.ignite.internal.processors.resource;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Arrays;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeLoadBalancer;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskContinuousMapper;
@@ -34,22 +33,10 @@ import org.apache.ignite.internal.GridInternalWrapper;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lifecycle.LifecycleBean;
-import org.apache.ignite.resources.CacheNameResource;
-import org.apache.ignite.resources.CacheStoreSessionResource;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.JobContextResource;
-import org.apache.ignite.resources.LoadBalancerResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.resources.ServiceResource;
-import org.apache.ignite.resources.SpringApplicationContextResource;
-import org.apache.ignite.resources.SpringResource;
-import org.apache.ignite.resources.TaskContinuousMapperResource;
-import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.spi.IgniteSpi;
import org.jetbrains.annotations.Nullable;
@@ -58,42 +45,6 @@ import org.jetbrains.annotations.Nullable;
* Processor for all Ignite and task/job resources.
*/
public class GridResourceProcessor extends GridProcessorAdapter {
- /** */
- private static final Collection<Class<? extends Annotation>> JOB_INJECTIONS = Arrays.asList(
- TaskSessionResource.class,
- JobContextResource.class,
- IgniteInstanceResource.class,
- SpringApplicationContextResource.class,
- SpringResource.class,
- LoggerResource.class,
- ServiceResource.class);
-
- /** */
- private static final Collection<Class<? extends Annotation>> TASK_INJECTIONS = Arrays.asList(
- TaskSessionResource.class,
- LoadBalancerResource.class,
- TaskContinuousMapperResource.class,
- IgniteInstanceResource.class,
- SpringApplicationContextResource.class,
- SpringResource.class,
- LoggerResource.class,
- ServiceResource.class);
-
- /** Grid instance injector. */
- private GridResourceBasicInjector<IgniteEx> gridInjector;
-
- /** Spring application context injector. */
- private GridResourceInjector springCtxInjector;
-
- /** Logger injector. */
- private GridResourceBasicInjector<IgniteLogger> logInjector;
-
- /** Services injector. */
- private GridResourceBasicInjector<Collection<Service>> srvcInjector;
-
- /** Spring bean resources injector. */
- private GridResourceInjector springBeanInjector;
-
/** Cleaning injector. */
private final GridResourceInjector nullInjector = new GridResourceBasicInjector<>(null);
@@ -103,6 +54,9 @@ public class GridResourceProcessor extends GridProcessorAdapter {
/** */
private final GridResourceIoc ioc = new GridResourceIoc();
+ /** */
+ private final GridResourceInjector[] injectorByAnnotation;
+
/**
* Creates resources processor.
*
@@ -111,9 +65,14 @@ public class GridResourceProcessor extends GridProcessorAdapter {
public GridResourceProcessor(GridKernalContext ctx) {
super(ctx);
- gridInjector = new GridResourceBasicInjector<>(ctx.grid());
- logInjector = new GridResourceLoggerInjector(ctx.config().getGridLogger());
- srvcInjector = new GridResourceServiceInjector(ctx.grid());
+ injectorByAnnotation = new GridResourceInjector[GridResourceIoc.ResourceAnnotation.values().length];
+
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SERVICE.ordinal()] =
+ new GridResourceServiceInjector(ctx.grid());
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.LOGGER.ordinal()] =
+ new GridResourceLoggerInjector(ctx.config().getGridLogger());
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
+ new GridResourceBasicInjector<>(ctx.grid());
}
/** {@inheritDoc} */
@@ -138,8 +97,12 @@ public class GridResourceProcessor extends GridProcessorAdapter {
public void setSpringContext(@Nullable GridSpringResourceContext rsrcCtx) {
this.rsrcCtx = rsrcCtx;
- springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector;
- springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector;
+ GridResourceInjector springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector;
+ GridResourceInjector springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector;
+
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING.ordinal()] = springBeanInjector;
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING_APPLICATION_CONTEXT.ordinal()] =
+ springCtxInjector;
}
/**
@@ -187,17 +150,15 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void inject(GridDeployment dep, Class<?> depCls, Object target) throws IgniteCheckedException {
+ assert target != null;
+
if (log.isDebugEnabled())
log.debug("Injecting resources: " + target);
// Unwrap Proxy object.
target = unwrapTarget(target);
- ioc.inject(target, IgniteInstanceResource.class, gridInjector, dep, depCls);
- ioc.inject(target, SpringApplicationContextResource.class, springCtxInjector, dep, depCls);
- ioc.inject(target, SpringResource.class, springBeanInjector, dep, depCls);
- ioc.inject(target, LoggerResource.class, logInjector, dep, depCls);
- ioc.inject(target, ServiceResource.class, srvcInjector, dep, depCls);
+ inject(target, GridResourceIoc.AnnotationSet.GENERIC, dep, depCls);
}
/**
@@ -216,7 +177,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- ioc.inject(obj, CacheNameResource.class, new GridResourceBasicInjector<>(cacheName), null, null);
+ inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_NAME, null, null, cacheName);
}
/**
@@ -236,7 +197,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- return ioc.inject(obj, CacheStoreSessionResource.class, new GridResourceBasicInjector<>(ses), null, null);
+ return inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_STORE_SESSION, null, null, ses);
}
/**
@@ -244,6 +205,17 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed to inject.
*/
public void injectGeneric(Object obj) throws IgniteCheckedException {
+ inject(obj, GridResourceIoc.AnnotationSet.GENERIC);
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @param annSet Supported annotations.
+ * @param params Parameters.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ public void inject(Object obj, GridResourceIoc.AnnotationSet annSet, Object... params)
+ throws IgniteCheckedException {
assert obj != null;
if (log.isDebugEnabled())
@@ -252,33 +224,126 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ inject(obj, annSet, null, null, params);
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @param annSet Supported annotations.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @param params Parameters.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ private void inject(Object obj,
+ GridResourceIoc.AnnotationSet annSet,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls,
+ Object... params)
+ throws IgniteCheckedException {
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(annSet) == 0)
+ return;
+
+ int i = 0;
+ for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) {
+ if (clsDesc.isAnnotated(ann)) {
+ final GridResourceInjector injector = injectorByAnnotation(ann, i < params.length ? params[i] : null);
+
+ if (injector != null)
+ clsDesc.inject(obj, ann, injector, dep, depCls);
+ }
+
+ i++;
+ }
}
/**
* @param obj Object.
+ * @param annSet Supported annotations.
* @throws IgniteCheckedException If failed.
*/
- public void cleanupGeneric(Object obj) throws IgniteCheckedException {
- if (obj != null) {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + obj);
-
- // Unwrap Proxy object.
- obj = unwrapTarget(obj);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ private void cleanup(Object obj, GridResourceIoc.AnnotationSet annSet)
+ throws IgniteCheckedException {
+ assert obj != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Cleaning up resources: " + obj);
+
+ // Unwrap Proxy object.
+ obj = unwrapTarget(obj);
+
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(annSet) == 0)
+ return;
+
+ for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations)
+ clsDesc.inject(obj, ann, nullInjector, null, null);
+ }
+
+ /**
+ * @param ann Annotation.
+ * @param param Injector parameter.
+ * @return Injector.
+ */
+ private GridResourceInjector injectorByAnnotation(GridResourceIoc.ResourceAnnotation ann, Object param) {
+ final GridResourceInjector res;
+
+ switch (ann) {
+ case CACHE_NAME:
+ case TASK_SESSION:
+ case LOAD_BALANCER:
+ case TASK_CONTINUOUS_MAPPER:
+ case CACHE_STORE_SESSION:
+ res = new GridResourceBasicInjector<>(param);
+ break;
+
+ case JOB_CONTEXT:
+ res = new GridResourceJobContextInjector((ComputeJobContext)param);
+ break;
+
+ default:
+ res = injectorByAnnotation[ann.ordinal()];
+ break;
}
+
+ return res;
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls, Object param)
+ throws IgniteCheckedException {
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(ann)) {
+ GridResourceInjector injector = injectorByAnnotation(ann, param);
+
+ if (injector != null)
+ return clsDesc.inject(obj, ann, injector, dep, depCls);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param obj Object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void cleanupGeneric(Object obj) throws IgniteCheckedException {
+ if (obj != null)
+ cleanup(obj, GridResourceIoc.AnnotationSet.GENERIC);
}
/**
@@ -321,30 +386,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
*/
private void injectToJob(GridDeployment dep, Class<?> taskCls, Object job, ComputeTaskSession ses,
GridJobContextImpl jobCtx) throws IgniteCheckedException {
- Class<? extends Annotation>[] filtered = ioc.filter(dep, job, JOB_INJECTIONS);
-
- if (filtered.length > 0) {
- for (Class<? extends Annotation> annCls : filtered) {
- if (annCls == TaskSessionResource.class)
- injectBasicResource(job, TaskSessionResource.class, ses, dep, taskCls);
- else if (annCls == JobContextResource.class)
- ioc.inject(job, JobContextResource.class, new GridResourceJobContextInjector(jobCtx),
- dep, taskCls);
- else if (annCls == IgniteInstanceResource.class)
- ioc.inject(job, IgniteInstanceResource.class, gridInjector, dep, taskCls);
- else if (annCls == SpringApplicationContextResource.class)
- ioc.inject(job, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls);
- else if (annCls == SpringResource.class)
- ioc.inject(job, SpringResource.class, springBeanInjector, dep, taskCls);
- else if (annCls == LoggerResource.class)
- ioc.inject(job, LoggerResource.class, logInjector, dep, taskCls);
- else {
- assert annCls == ServiceResource.class;
-
- ioc.inject(job, ServiceResource.class, srvcInjector, dep, taskCls);
- }
- }
- }
+
+ inject(job, GridResourceIoc.AnnotationSet.JOB, dep, taskCls, ses, jobCtx);
}
/**
@@ -365,34 +408,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
Object obj = unwrapTarget(task);
- Class<? extends Annotation>[] filtered = ioc.filter(dep, obj, TASK_INJECTIONS);
-
- if (filtered.length == 0)
- return;
-
- Class<?> taskCls = obj.getClass();
-
- for (Class<? extends Annotation> annCls : filtered) {
- if (annCls == TaskSessionResource.class)
- injectBasicResource(obj, TaskSessionResource.class, ses, dep, taskCls);
- else if (annCls == LoadBalancerResource.class)
- injectBasicResource(obj, LoadBalancerResource.class, balancer, dep, taskCls);
- else if (annCls == TaskContinuousMapperResource.class)
- injectBasicResource(obj, TaskContinuousMapperResource.class, mapper, dep, taskCls);
- else if (annCls == IgniteInstanceResource.class)
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, dep, taskCls);
- else if (annCls == SpringApplicationContextResource.class)
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls);
- else if (annCls == SpringResource.class)
- ioc.inject(obj, SpringResource.class, springBeanInjector, dep, taskCls);
- else if (annCls == LoggerResource.class)
- ioc.inject(obj, LoggerResource.class, logInjector, dep, taskCls);
- else {
- assert annCls == ServiceResource.class;
-
- ioc.inject(obj, ServiceResource.class, srvcInjector, dep, taskCls);
- }
- }
+ inject(obj, GridResourceIoc.AnnotationSet.TASK, dep, null, ses, balancer, mapper);
}
/**
@@ -408,24 +424,25 @@ public class GridResourceProcessor extends GridProcessorAdapter {
}
/**
+ * Checks if annotations presents in specified object.
+ *
+ * @param dep Class deployment.
+ * @param target Object to check.
+ * @param annSet Annotations to find.
+ * @return {@code true} if any annotation is presented, {@code false} if it's not.
+ */
+ public boolean isAnnotationsPresent(GridDeployment dep, Object target, GridResourceIoc.AnnotationSet annSet) {
+ return ioc.isAnnotationsPresent(dep, target, annSet);
+ }
+
+ /**
* Injects held resources into given SPI implementation.
*
* @param spi SPI implementation.
* @throws IgniteCheckedException Throw in case of any errors.
*/
public void inject(IgniteSpi spi) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + spi);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(spi);
-
- // Caching key is null for the SPIs.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
+ injectGeneric(spi);
}
/**
@@ -436,17 +453,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(IgniteSpi spi) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + spi);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(spi);
-
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ cleanupGeneric(spi);
}
/**
@@ -456,18 +463,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void inject(LifecycleBean lifecycleBean) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + lifecycleBean);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(lifecycleBean);
-
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ injectGeneric(lifecycleBean);
}
/**
@@ -478,18 +474,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(LifecycleBean lifecycleBean) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + lifecycleBean);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(lifecycleBean);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ cleanupGeneric(lifecycleBean);
}
/**
@@ -499,18 +484,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public void inject(Service svc) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + svc);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(svc);
-
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ injectGeneric(svc);
}
/**
@@ -521,39 +495,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(Service svc) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + svc);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(svc);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
- }
-
- /**
- * This method is declared public as it is used from tests as well.
- * Note, that this method can be used only with unwrapped objects
- * (see {@link #unwrapTarget(Object)}).
- *
- * @param target Target object.
- * @param annCls Setter annotation.
- * @param rsrc Resource to inject.
- * @param dep Deployment.
- * @param depCls Deployed class.
- * @throws IgniteCheckedException If injection failed.
- */
- public void injectBasicResource(Object target, Class<? extends Annotation> annCls, Object rsrc,
- GridDeployment dep, Class<?> depCls) throws IgniteCheckedException {
- // Safety.
- assert !(rsrc instanceof GridResourceInjector) : "Invalid injection.";
-
- // Basic injection don't cache anything. Use null as a key.
- ioc.inject(target, annCls, new GridResourceBasicInjector<>(rsrc), dep, depCls);
+ cleanupGeneric(svc);
}
/**
@@ -602,4 +544,4 @@ public class GridResourceProcessor extends GridProcessorAdapter {
ioc.printMemoryStats();
}
-}
\ No newline at end of file
+}