You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/31 03:40:05 UTC
[1/9] ignite git commit: IGNITE-2546 - Transformers for SCAN queries.
Fixes #949.
Repository: ignite
Updated Branches:
refs/heads/master d98cd3093 -> 8f697876a
IGNITE-2546 - Transformers for SCAN queries. Fixes #949.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/407071e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/407071e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/407071e4
Branch: refs/heads/master
Commit: 407071e466d1a4fcec86571d4791ace8bb206f53
Parents: 0465874
Author: Eduard Shangareev <es...@gridgain.com>
Authored: Mon Aug 29 17:32:31 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Aug 29 17:32:31 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 15 +
.../processors/cache/IgniteCacheProxy.java | 112 +++-
.../processors/cache/query/CacheQuery.java | 11 +-
.../query/GridCacheDistributedQueryManager.java | 22 +-
.../cache/query/GridCacheLocalQueryManager.java | 3 +-
.../cache/query/GridCacheQueryAdapter.java | 69 ++-
.../cache/query/GridCacheQueryBean.java | 8 +-
.../cache/query/GridCacheQueryInfo.java | 8 +-
.../cache/query/GridCacheQueryManager.java | 125 ++--
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../GridCacheQueryTransformerSelfTest.java | 570 +++++++++++++++++++
.../multijvm/IgniteCacheProcessProxy.java | 6 +
.../IgniteCacheQuerySelfTestSuite.java | 2 +
13 files changed, 832 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 40eedaf..2290fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.transactions.TransactionHeuristicException;
@@ -295,6 +296,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public <R> QueryCursor<R> query(Query<R> qry);
/**
+ * Queries the cache transforming the entries on the server nodes. Can be used, for example,
+ * to avoid network overhead in case only one field out of the large is required by client.
+ * <p>
+ * Currently transformers are supported ONLY for {@link ScanQuery}. Passing any other
+ * subclass of {@link Query} interface to this method will end up with
+ * {@link UnsupportedOperationException}.
+ *
+ * @param qry Query.
+ * @param transformer Transformer.
+ * @return Cursor.
+ */
+ public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer);
+
+ /**
* Allows for iteration over local cache entries.
*
* @param peekModes Peek modes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0d7bc6a..9b26c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -466,50 +467,74 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * @param filter Filter.
+ * @param scanQry ScanQry.
+ * @param transformer Transformer
* @param grp Optional cluster group.
* @return Cursor.
*/
@SuppressWarnings("unchecked")
- private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+ private <T, R> QueryCursor<R> query(
+ final ScanQuery scanQry,
+ @Nullable final IgniteClosure<T, R> transformer,
+ @Nullable ClusterGroup grp)
throws IgniteCheckedException {
- final CacheQuery<Map.Entry<K, V>> qry;
+
+ final CacheQuery<R> qry;
boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
- if (filter instanceof ScanQuery) {
- IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
+ IgniteBiPredicate<K, V> p = scanQry.getFilter();
- qry = ctx.queries().createScanQuery(p, ((ScanQuery)filter).getPartition(), isKeepBinary);
+ qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
- if (grp != null)
- qry.projection(grp);
+ if (grp != null)
+ qry.projection(grp);
- final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx,
- new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>() {
- @Override public GridCloseableIterator<Entry<K, V>> applyx() throws IgniteCheckedException {
- final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery();
+ final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(ctx,
+ new IgniteOutClosureX<GridCloseableIterator<R>>() {
+ @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
+ final GridCloseableIterator iter0 = qry.executeScanQuery();
- return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
- @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
- Map.Entry<K, V> next = iter0.nextX();
+ final boolean needToConvert = transformer == null;
- return new CacheEntryImpl<>(next.getKey(), next.getValue());
- }
+ return new GridCloseableIteratorAdapter<R>() {
+ @Override protected R onNext() throws IgniteCheckedException {
+ Object next = iter0.nextX();
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- return iter0.hasNextX();
- }
+ if (needToConvert) {
+ Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
- @Override protected void onClose() throws IgniteCheckedException {
- iter0.close();
+ return (R) new CacheEntryImpl<>(entry.getKey(), entry.getValue());
}
- };
- }
- }, false);
- return new QueryCursorImpl<>(iter);
- }
+ return (R)next;
+ }
+
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ return iter0.hasNextX();
+ }
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ iter0.close();
+ }
+ };
+ }
+ }, false);
+
+ return new QueryCursorImpl<>(iter);
+ }
+
+ /**
+ * @param filter Filter.
+ * @param grp Optional cluster group.
+ * @return Cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+ throws IgniteCheckedException {
+ final CacheQuery<Map.Entry<K, V>> qry;
+
+ boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
final CacheQueryFuture<Map.Entry<K, V>> fut;
@@ -692,6 +717,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
+ if (qry instanceof ScanQuery)
+ return query((ScanQuery)qry, null, projection(qry.isLocal()));
+
return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
}
catch (Exception e) {
@@ -705,6 +733,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
+ /** {@inheritDoc} */
+ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+ A.notNull(qry, "qry");
+ A.notNull(transformer, "transformer");
+
+ if (!(qry instanceof ScanQuery))
+ throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ validate(qry);
+
+ return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
+ }
+ catch (Exception e) {
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
+ throw new CacheException(e);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
/**
* @return {@code true} If this is a replicated cache and we are on a data node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 47c6e89..3fa041b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -273,15 +273,6 @@ public interface CacheQuery<T> {
public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args);
/**
- * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely.
- *
- * @param rmtTransform Remote transformer.
- * @param args Optional arguments.
- * @return Future for the query result.
- */
- public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args);
-
- /**
* Gets metrics for this query.
*
* @return Query metrics.
@@ -296,5 +287,5 @@ public interface CacheQuery<T> {
/**
* @return Scan query iterator.
*/
- public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException;
+ public GridCloseableIterator executeScanQuery() throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 5f6cb8f..d34047e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -253,7 +253,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
*/
@Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) {
IgniteReducer<Object, Object> rdc = req.reducer();
- IgniteClosure<Object, Object> trans = req.transformer();
+ IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)req.transformer();
ClusterNode sndNode = cctx.node(sndId);
@@ -578,16 +578,16 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "serial"})
- @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry,
+ @Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert !cctx.isLocal() : cctx.name();
assert qry.type() == GridCacheQueryType.SCAN: qry;
- GridCloseableIterator<Map.Entry<K, V>> locIter0 = null;
+ GridCloseableIterator locIter0 = null;
for (ClusterNode node : nodes) {
if (node.isLocal()) {
- locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false);
+ locIter0 = scanQueryLocal(qry, false);
Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1);
@@ -603,21 +603,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
}
- final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0;
+ final GridCloseableIterator locIter = locIter0;
- final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+ final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null);
- final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes);
+ final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, nodes);
- return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() {
+ return new GridCloseableIteratorAdapter() {
/** */
- private Map.Entry<K, V> cur;
+ private Object cur;
- @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException {
+ @Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
- Map.Entry<K, V> e = cur;
+ Object e = cur;
cur = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 183abde..147725b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -90,7 +89,7 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
}
/** {@inheritDoc} */
- @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+ @Override public GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert cctx.isLocal() : cctx.name();
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 90e14f4..f65b733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -87,6 +87,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Transformer. */
+ private IgniteClosure<?, ?> transform;
+
/** Partition. */
private Integer part;
@@ -126,6 +129,39 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param cctx Context.
* @param type Query type.
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param keepBinary Keep binary flag.
+ */
+ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ @Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable IgniteClosure<Map.Entry, Object> transform,
+ @Nullable Integer part,
+ boolean keepBinary) {
+ assert cctx != null;
+ assert type != null;
+ assert part == null || part >= 0;
+
+ this.cctx = cctx;
+ this.type = type;
+ this.filter = filter;
+ this.transform = transform;
+ this.part = part;
+ this.keepBinary = keepBinary;
+
+ log = cctx.logger(getClass());
+
+ metrics = new GridCacheQueryMetricsAdapter();
+
+ this.incMeta = false;
+ this.clsName = null;
+ this.clause = null;
+ }
+
+ /**
+ * @param cctx Context.
+ * @param type Query type.
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
@@ -376,6 +412,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @return Transformer.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() {
+ return (IgniteClosure<Map.Entry<K, V>, Object>) transform;
+ }
+
+ /**
* @return Partition.
*/
@Nullable public Integer partition() {
@@ -402,17 +446,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
- return execute(null, null, args);
+ return execute0(null, args);
}
/** {@inheritDoc} */
@Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
- return execute(rmtReducer, null, args);
- }
-
- /** {@inheritDoc} */
- @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
- return execute(null, rmtTransform, args);
+ return execute0(rmtReducer, args);
}
/** {@inheritDoc} */
@@ -427,13 +466,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param rmtReducer Optional reducer.
- * @param rmtTransform Optional transformer.
* @param args Arguments.
* @return Future.
*/
@SuppressWarnings({"IfMayBeConditional", "unchecked"})
- private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
- @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
+ private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
assert type != SCAN : this;
Collection<ClusterNode> nodes;
@@ -455,7 +492,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (cctx.deploymentEnabled()) {
try {
- cctx.deploy().registerClasses(filter, rmtReducer, rmtTransform);
+ cctx.deploy().registerClasses(filter, rmtReducer);
cctx.deploy().registerClasses(args);
}
catch (IgniteCheckedException e) {
@@ -469,7 +506,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
taskHash = cctx.kernalContext().job().currentTaskNameHash();
final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
- (IgniteClosure<Object, Object>)rmtTransform, args);
+ null, args);
final GridCacheQueryManager qryMgr = cctx.queries();
@@ -484,8 +521,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "unchecked"})
- @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException {
- assert type == SCAN: "Wrong processing of qyery: " + type;
+ @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
+ assert type == SCAN : "Wrong processing of qyery: " + type;
Collection<ClusterNode> nodes = nodes();
@@ -508,7 +545,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
final GridCacheQueryManager qryMgr = cctx.queries();
if (part != null && !cctx.isLocal())
- return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+ return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
else {
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
@@ -676,7 +713,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
try {
GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true);
- tuple= new T2(it, null);
+ tuple = new T2(it, null);
}
catch (IgniteClientDisconnectedCheckedException e) {
throw CU.convertToCacheException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
index 5a4d693..286ddc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
@@ -33,7 +33,7 @@ public class GridCacheQueryBean {
private final IgniteReducer<Object, Object> rdc;
/** */
- private final IgniteClosure<Object, Object> trans;
+ private final IgniteClosure<?, ?> trans;
/** */
private final Object[] args;
@@ -45,7 +45,7 @@ public class GridCacheQueryBean {
* @param args Optional arguments.
*/
public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable IgniteReducer<Object, Object> rdc,
- @Nullable IgniteClosure<Object, Object> trans, @Nullable Object[] args) {
+ @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) {
assert qry != null;
this.qry = qry;
@@ -71,7 +71,7 @@ public class GridCacheQueryBean {
/**
* @return Transformer.
*/
- @Nullable public IgniteClosure<Object, Object> transform() {
+ @Nullable public IgniteClosure<?, ?> transform() {
return trans;
@@ -88,4 +88,4 @@ public class GridCacheQueryBean {
@Override public String toString() {
return S.toString(GridCacheQueryBean.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
index 8d2e67d..0a108d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
@@ -31,7 +31,7 @@ class GridCacheQueryInfo {
private boolean loc;
/** */
- private IgniteClosure<Object, Object> trans;
+ private IgniteClosure<?, ?> trans;
/** */
private IgniteReducer<Object, Object> rdc;
@@ -71,7 +71,7 @@ class GridCacheQueryInfo {
*/
GridCacheQueryInfo(
boolean loc,
- IgniteClosure<Object, Object> trans,
+ IgniteClosure<?, ?> trans,
IgniteReducer<Object, Object> rdc,
GridCacheQueryAdapter<?> qry,
GridCacheLocalQueryFuture<?, ?, ?> locFut,
@@ -117,7 +117,7 @@ class GridCacheQueryInfo {
/**
* @return Transformer.
*/
- IgniteClosure<?, Object> transformer() {
+ IgniteClosure<?, ?> transformer() {
return trans;
}
@@ -167,4 +167,4 @@ class GridCacheQueryInfo {
@Override public String toString() {
return S.toString(GridCacheQueryInfo.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 163bac5..454ce04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -506,7 +506,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
- public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+ public abstract GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException;
/**
@@ -1067,13 +1067,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final GridDhtLocalPartition locPart0 = locPart;
return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
- @Override protected void onClose() {
- super.onClose();
+ @Override protected void onClose() {
+ super.onClose();
- if (locPart0 != null)
- locPart0.release();
- }
- };
+ if (locPart0 != null)
+ locPart0.release();
+ }
+ };
}
/**
@@ -1163,9 +1163,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
- if (filter != null || locNode) {
+ if (filter != null || locNode)
val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary);
- }
if (filter != null && !filter.apply(key, val))
continue;
@@ -1187,14 +1186,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param locNode Local node.
* @return Final key-value iterator.
*/
- private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator(
+ private GridIterator<IgniteBiTuple<K, V>> scanExpiryIterator(
final Iterator<Map.Entry<byte[], byte[]>> it,
AffinityTopologyVersion topVer,
@Nullable final IgniteBiPredicate<K, V> filter,
ExpiryPolicy expPlc,
final boolean keepBinary,
boolean locNode) {
- Iterator <K> keyIter = new Iterator<K>() {
+ Iterator<K> keyIter = new Iterator<K>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
@@ -1267,10 +1266,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer();
IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer();
- injectResources(trans);
injectResources(rdc);
GridCacheQueryAdapter<?> qry = qryInfo.query();
@@ -1289,7 +1286,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = qryInfo.local() ?
executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
+ recipient(qryInfo.senderId(), qryInfo.requestId())) :
fieldsQueryResult(qryInfo, taskName);
// If metadata needs to be returned to user and cleaned from internal fields - copy it.
@@ -1460,10 +1457,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Map.Entry<K, V>, Object> trans =
- (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteClosure<Cache.Entry<K, V>, Object> trans =
+ (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
- IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
+ IgniteReducer<Cache.Entry<K, V>, Object> rdc = (IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
injectResources(rdc);
@@ -1481,13 +1478,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = loc ?
executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
+ recipient(qryInfo.senderId(), qryInfo.requestId())) :
queryResult(qryInfo, taskName);
iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
type = res.type();
- GridCacheAdapter<K, V> cache = cctx.cache();
+ final GridCacheAdapter<K, V> cache = cctx.cache();
if (log.isDebugEnabled())
log.debug("Received index iterator [iterHasNext=" + iter.hasNext() +
@@ -1518,7 +1515,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
break;
}
- K key = row.getKey();
+ final K key = row.getKey();
// Filter backups for SCAN queries, if it isn't partition scan.
// Other types are filtered in indexing manager.
@@ -1561,8 +1558,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
if (readEvt) {
- K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
- V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+ V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
switch (type) {
case SQL:
@@ -1630,27 +1627,33 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
- Map.Entry<K, V> entry = F.t(key, val);
+ if (rdc != null || trans != null) {
+ Cache.Entry<K, V> entry;
- // Unwrap entry for reducer or transformer only.
- if (rdc != null || trans != null)
- entry = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(entry, qry.keepBinary());
+ if (qry.keepBinary())
+ entry = cache.<K, V>keepBinary().getEntry(key);
+ else
+ entry = cache.<K, V>getEntry(key);
- // Reduce.
- if (rdc != null) {
- if (!rdc.collect(entry) || !iter.hasNext()) {
- onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+ // Reduce.
+ if (rdc != null) {
+ if (!rdc.collect(entry) || !iter.hasNext()) {
+ onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
- pageSent = true;
+ pageSent = true;
- break;
+ break;
+ }
+ else
+ continue;
}
- else
- continue;
+
+ data.add(trans != null ? trans.apply(entry) :
+ !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
}
+ else
+ data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
- data.add(trans != null ? trans.apply(entry) :
- !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
if (!loc) {
if (++cnt == pageSize || !iter.hasNext()) {
@@ -1720,7 +1723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param updStatisticsIfNeeded Update statistics flag.
*/
@SuppressWarnings({"unchecked", "serial"})
- protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry,
+ protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request (grid is stopping).");
@@ -1769,8 +1772,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
- return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
+ return new GridCloseableIteratorAdapter<Object>() {
+ @Override protected Object onNext() throws IgniteCheckedException {
long start = statsEnabled ? System.nanoTime() : 0L;
IgniteBiTuple<K, V> next = iter.nextX();
@@ -1803,7 +1806,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null));
}
- return next;
+ IgniteClosure transform = qry.transform();
+
+ if (transform == null)
+ return next;
+
+ Cache.Entry<K, V> entry;
+
+ if (qry.keepBinary())
+ entry = cctx.cache().keepBinary().getEntry(next.getKey());
+ else
+ entry = cctx.cache().getEntry(next.getKey());
+
+
+ return transform.apply(entry);
}
@Override protected boolean onHasNext() throws IgniteCheckedException {
@@ -1832,7 +1848,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
- private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
+ private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
+ String taskName) throws IgniteCheckedException {
assert qryInfo != null;
final UUID sndId = qryInfo.senderId();
@@ -2845,7 +2862,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (locNode)
return new IgniteBiTuple<>(key, val);
- else{
+ else {
if (key instanceof CacheObject)
((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
@@ -3256,16 +3273,28 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param keepBinary Keep binary flag.
* @return Created query.
*/
- public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+ public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
@Nullable Integer part, boolean keepBinary) {
+ return createScanQuery(filter, null, part, keepBinary);
+ }
- return new GridCacheQueryAdapter<>(cctx,
+ /**
+ * Creates user's predicate based scan query.
+ *
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param keepBinary Keep binary flag.
+ * @return Created query.
+ */
+ public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+ @Nullable IgniteClosure<T, R> trans,
+ @Nullable Integer part, boolean keepBinary) {
+
+ return new GridCacheQueryAdapter(cctx,
SCAN,
- null,
- null,
- (IgniteBiPredicate<Object, Object>)filter,
+ filter,
+ trans,
part,
- false,
keepBinary);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index f50fba0..5610bef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -84,7 +84,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** */
@GridDirectTransient
- private IgniteClosure<Object, Object> trans;
+ private IgniteClosure<?, ?> trans;
/** */
private byte[] transBytes;
@@ -233,7 +233,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
IgniteBiPredicate<Object, Object> keyValFilter,
@Nullable Integer part,
IgniteReducer<Object, Object> rdc,
- IgniteClosure<Object, Object> trans,
+ IgniteClosure<?, ?> trans,
int pageSize,
boolean incBackups,
Object[] args,
@@ -422,7 +422,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/**
* @return Transformer.
*/
- public IgniteClosure<Object, Object> transformer() {
+ public IgniteClosure<?, ?> transformer() {
return trans;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
new file mode 100644
index 0000000..6b13e05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for scan query with transformer.
+ */
+public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+ cfg.setMarshaller(null);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(3);
+
+ Ignition.setClientMode(true);
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetKeys() throws Exception {
+ IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, "val" + i);
+
+ IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, String> e) {
+ return e.getKey();
+ }
+ };
+
+ List<Integer> keys = cache.query(new ScanQuery<Integer, String>(), transformer).getAll();
+
+ assertEquals(50, keys.size());
+
+ Collections.sort(keys);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i, keys.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetKeysFiltered() throws Exception {
+ IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, "val" + i);
+
+ IgniteBiPredicate<Integer, String> filter = new IgniteBiPredicate<Integer, String>() {
+ @Override public boolean apply(Integer k, String v) {
+ return k % 10 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, String> e) {
+ return e.getKey();
+ }
+ };
+
+ List<Integer> keys = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, keys.size());
+
+ Collections.sort(keys);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 10, keys.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetObjectField() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ List<Integer> res = cache.query(new ScanQuery<Integer, Value>(), transformer).getAll();
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetObjectFieldFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+ @Override public boolean apply(Integer k, Value v) {
+ return v.idx % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ List<Integer> res = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testKeepBinary() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ List<Integer> res = binaryCache.query(new ScanQuery<Integer, BinaryObject>(), transformer).getAll();
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testKeepBinaryFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+ IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+ @Override public boolean apply(Integer k, BinaryObject v) {
+ return v.<Integer>field("idx") % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ List<Integer> res = binaryCache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+ @Override public boolean apply(Integer k, Value v) {
+ return v.idx % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ return ignite.cache("test-cache").query(new ScanQuery<>(filter).setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalKeepBinary() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ return ignite.cache("test-cache").withKeepBinary().query(
+ new ScanQuery<Integer, BinaryObject>().setLocal(true), transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalKeepBinaryFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+ @Override public boolean apply(Integer k, BinaryObject v) {
+ return v.<Integer>field("idx") % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ return ignite.cache("test-cache").withKeepBinary().query(new ScanQuery<>(filter).setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUnsupported() throws Exception {
+ final IgniteCache<Integer, Integer> cache = grid().createCache("test-cache");
+
+ final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Integer> e) {
+ return null;
+ }
+ };
+
+ try {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlFieldsQuery("clause"), new IgniteClosure<List<?>, Object>() {
+ @Override public Object apply(List<?> objects) {
+ return null;
+ }
+ });
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new TextQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SpiQuery<Integer, Integer>(), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new ContinuousQuery<Integer, Integer>(), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ */
+ private static class Value {
+ /** */
+ @SuppressWarnings("unused")
+ private String str;
+
+ /** */
+ private int idx;
+
+ /**
+ * @param str String.
+ * @param idx Integer.
+ */
+ public Value(String str, int idx) {
+ this.str = str;
+ this.idx = idx;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 740b201..71dc964 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -172,6 +173,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+ throw new UnsupportedOperationException("Method should be supported.");
+ }
+
+ /** {@inheritDoc} */
@Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 1b1908d..3652acd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -115,6 +116,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class);
suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class);
+ suite.addTestSuite(GridCacheQueryTransformerSelfTest.class);
return suite;
}
[3/9] ignite git commit: ignite-2560 Support resource injection for
entry processor, optimizations for resource injection.
Posted by ak...@apache.org.
ignite-2560 Support resource injection for entry processor, optimizations for resource injection.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9ff97c9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9ff97c9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9ff97c9
Branch: refs/heads/master
Commit: f9ff97c91374dcd9cd8ad08d46d1d2de44193060
Parents: 407071e
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Aug 30 09:31:20 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Aug 30 09:32:23 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheLazyEntry.java | 2 +
.../EntryProcessorResourceInjectorProxy.java | 105 +++++
.../processors/cache/GridCacheMapEntry.java | 13 +-
.../GridNearAtomicSingleUpdateFuture.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 18 +-
.../transactions/IgniteTxLocalAdapter.java | 5 +-
.../processors/resource/GridResourceIoc.java | 438 +++++++++++++++----
.../resource/GridResourceProcessor.java | 396 +++++++----------
.../cache/GridCacheAbstractFullApiSelfTest.java | 404 +++++++++++++++--
.../cache/GridCacheAbstractSelfTest.java | 140 +++++-
.../GridCacheTransformEventSelfTest.java | 66 ++-
...ePartitionedBasicStoreMultiNodeSelfTest.java | 2 +
.../GridTransformSpringInjectionSelfTest.java | 186 ++++++++
.../testsuites/IgniteSpringTestSuite.java | 7 +-
.../IgniteInvokeWithInjectionBenchmark.java | 74 ++++
.../IgniteInvokeWithInjectionTxBenchmark.java | 30 ++
17 files changed, 1515 insertions(+), 396 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index c8cfc99..02cccc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -191,6 +191,8 @@ public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> {
@Override public <T> T unwrap(Class<T> cls) {
if (cls.isAssignableFrom(Ignite.class))
return (T)cctx.kernalContext().grid();
+ else if (cls.isAssignableFrom(GridCacheContext.class))
+ return (T)cctx;
else if (cls.isAssignableFrom(getClass()))
return cls.cast(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
new file mode 100644
index 0000000..76b2511
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Entry processor wrapper injecting Ignite resources into target processor before execution.
+ */
+public class EntryProcessorResourceInjectorProxy<K, V, T> implements EntryProcessor<K, V, T>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate. */
+ private EntryProcessor<K, V, T> delegate;
+
+ /** Injected flag. */
+ private transient boolean injected;
+
+ /**
+ * @param delegate Delegate.
+ */
+ private EntryProcessorResourceInjectorProxy(EntryProcessor<K, V, T> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException {
+ if (!injected) {
+ GridCacheContext cctx = entry.unwrap(GridCacheContext.class);
+
+ GridResourceProcessor rsrc = cctx.kernalContext().resource();
+
+ try {
+ rsrc.inject(delegate, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, cctx.name());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ injected = true;
+ }
+
+ return delegate.process(entry, arguments);
+ }
+
+ /**
+ * @return Delegate entry processor.
+ */
+ public EntryProcessor<K, V, T> delegate() {
+ return delegate;
+ }
+
+ /**
+ * Wraps EntryProcessor if needed.
+ *
+ * @param ctx Context.
+ * @param proc Entry proc.
+ * @return Wrapped entry proc if wrapping is needed.
+ */
+ public static <K, V, T> EntryProcessor<K, V, T> wrap(GridKernalContext ctx,
+ @Nullable EntryProcessor<K, V, T> proc) {
+ if (proc == null || proc instanceof EntryProcessorResourceInjectorProxy)
+ return proc;
+
+ GridResourceProcessor rsrcProcessor = ctx.resource();
+
+ return rsrcProcessor.isAnnotationsPresent(null, proc, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR) ?
+ new EntryProcessorResourceInjectorProxy<>(proc) : proc;
+ }
+
+ /**
+ * Unwraps EntryProcessor as Object if needed.
+ *
+ * @param obj Entry processor.
+ * @return Unwrapped entry processor.
+ */
+ static Object unwrap(Object obj) {
+ return (obj instanceof EntryProcessorResourceInjectorProxy) ? ((EntryProcessorResourceInjectorProxy)obj).delegate() : obj;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f692bf4..c760ac1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -896,6 +896,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(
partition(),
key,
@@ -1004,7 +1006,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(false);
}
- if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(
partition(),
key,
@@ -1019,6 +1023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
transformClo != null ? transformClo.getClass().getName() : null,
taskName,
keepBinary);
+ }
}
}
@@ -1685,7 +1690,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Calculate new value.
if (op == GridCacheOperation.TRANSFORM) {
- transformCloClsName = writeObj.getClass().getName();
+ transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName();
EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
@@ -2463,6 +2468,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
evtOld = cctx.unwrapTemporary(oldVal);
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(partition(), key, evtNodeId, null,
newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
@@ -2553,6 +2560,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
evtOld = cctx.unwrapTemporary(oldVal);
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+
cctx.events().addEvent(partition(), key, evtNodeId, null,
newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 661a178..256c7ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -26,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -43,13 +51,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -549,6 +550,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2432f63..30a0c3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -800,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
@@ -826,6 +828,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
@@ -940,6 +944,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
ClusterNode primary = cctx.affinity().primary(cacheKey.partition(), topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index ac08f8f..a419887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -432,7 +434,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
needVer);
}
-
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@@ -511,7 +512,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
if (entry != null) {
- CacheObject v ;
+ CacheObject v;
GridCacheVersion ver;
if (needVer) {
@@ -541,7 +542,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
deserializeBinary,
true,
ver);
- }else
+ }
+ else
success = false;
}
else {
@@ -944,6 +946,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
if (op == UPDATE)
val = ctx.toCacheObject(val);
+ else if (op == TRANSFORM)
+ ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name());
while (true) {
GridCacheEntryEx entry = null;
@@ -1014,7 +1018,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
if (err != null)
throw err;
- Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) :
+ Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) :
(retval || op == TRANSFORM) ? res.get2() : res.get1();
if (op == TRANSFORM && ret == null)
@@ -1035,8 +1039,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
* @param filter Optional filter.
* @param subjId Subject ID.
* @param taskName Task name.
- * @throws CachePartialUpdateCheckedException If update failed.
* @return Results map for invoke operation.
+ * @throws CachePartialUpdateCheckedException If update failed.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
private Map<K, EntryProcessorResult> updateWithBatch(
@@ -1101,6 +1105,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
if (op == TRANSFORM) {
+ ctx.kernalContext().resource().inject(val,
+ GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR,
+ ctx.name());
+
EntryProcessor<Object, Object, Object> entryProcessor =
(EntryProcessor<Object, Object, Object>)val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 9ad7fb0..ee992cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -65,7 +66,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -89,6 +89,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
@@ -3664,7 +3665,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
this,
op,
val,
- entryProcessor,
+ EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor),
invokeArgs,
hasDrTtl ? drTtl : -1L,
entry,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 35824fa..0158973 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -21,12 +21,12 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.util.GridLeanIdentitySet;
@@ -35,6 +35,17 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.CacheNameResource;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoadBalancerResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.ServiceResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.resources.TaskContinuousMapperResource;
+import org.apache.ignite.resources.TaskSessionResource;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -42,17 +53,12 @@ import org.jsr166.ConcurrentHashMap8;
* Resource container contains caches for classes used for injection.
* Caches used to improve the efficiency of standard Java reflection mechanism.
*/
-class GridResourceIoc {
+public class GridResourceIoc {
/** Task class resource mapping. Used to efficiently cleanup resources related to class loader. */
- private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap =
- new ConcurrentHashMap8<>();
+ private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap = new ConcurrentHashMap8<>();
/** Class descriptors cache. */
- private final ConcurrentMap<Class<?>, ClassDescriptor> clsDescs = new ConcurrentHashMap8<>();
-
- /** */
- private final ConcurrentMap<Class<?>, Class<? extends Annotation>[]> annCache =
- new ConcurrentHashMap8<>();
+ private AtomicReference<Map<Class<?>, ClassDescriptor>> clsDescs = new AtomicReference<>();
/**
* @param ldr Class loader.
@@ -61,8 +67,22 @@ class GridResourceIoc {
Set<Class<?>> clss = taskMap.remove(ldr);
if (clss != null) {
- clsDescs.keySet().removeAll(clss);
- annCache.keySet().removeAll(clss);
+ Map<Class<?>, ClassDescriptor> newMap, oldMap;
+
+ do {
+ oldMap = clsDescs.get();
+
+ if (oldMap == null)
+ break;
+
+ newMap = new HashMap<>(oldMap.size() - clss.size());
+
+ for (Map.Entry<Class<?>, ClassDescriptor> entry : oldMap.entrySet()) {
+ if (!clss.contains(entry.getKey()))
+ newMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ while (!clsDescs.compareAndSet(oldMap, newMap));
}
}
@@ -71,8 +91,8 @@ class GridResourceIoc {
*/
void undeployAll() {
taskMap.clear();
- clsDescs.clear();
- annCache.clear();
+
+ clsDescs.set(null);
}
/**
@@ -83,8 +103,8 @@ class GridResourceIoc {
* @param injector Resource to inject.
* @param dep Deployment.
* @param depCls Deployment class.
- * @throws IgniteCheckedException Thrown in case of any errors during injection.
* @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
*/
@SuppressWarnings("SimplifiableIfStatement")
boolean inject(Object target,
@@ -92,26 +112,41 @@ class GridResourceIoc {
GridResourceInjector injector,
@Nullable GridDeployment dep,
@Nullable Class<?> depCls)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
return injectInternal(target, annCls, injector, dep, depCls, null);
}
/**
+ * @param dep Deployment.
* @param cls Class.
+ * @return Descriptor.
*/
- private ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
- ClassDescriptor res = clsDescs.get(cls);
+ ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
+ Map<Class<?>, ClassDescriptor> newMap, oldMap;
+ ClassDescriptor res, newDesc = null;
+
+ do {
+ oldMap = clsDescs.get();
+
+ if (oldMap != null && (res = oldMap.get(cls)) != null)
+ break;
- if (res == null) {
if (dep != null) {
Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
classes.add(cls);
+
+ dep = null;
}
- res = F.addIfAbsent(clsDescs, cls, new ClassDescriptor(cls));
+ if (oldMap == null)
+ newMap = new HashMap<>();
+ else
+ (newMap = new HashMap<>(oldMap.size() + 1)).putAll(oldMap);
+
+ newMap.put(cls, res = newDesc == null ? (newDesc = new ClassDescriptor(cls)) : newDesc);
}
+ while (!clsDescs.compareAndSet(oldMap, newMap));
return res;
}
@@ -123,8 +158,8 @@ class GridResourceIoc {
* @param dep Deployment.
* @param depCls Deployment class.
* @param checkedObjs Set of already inspected objects to avoid indefinite recursion.
- * @throws IgniteCheckedException Thrown in case of any errors during injection.
* @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
*/
private boolean injectInternal(Object target,
Class<? extends Annotation> annCls,
@@ -132,56 +167,14 @@ class GridResourceIoc {
@Nullable GridDeployment dep,
@Nullable Class<?> depCls,
@Nullable Set<Object> checkedObjs)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
Class<?> targetCls = target.getClass();
ClassDescriptor descr = descriptor(dep, targetCls);
T2<GridResourceField[], GridResourceMethod[]> annotatedMembers = descr.annotatedMembers(annCls);
- if (descr.recursiveFields().length == 0 && annotatedMembers == null)
- return false;
-
- if (checkedObjs == null && descr.recursiveFields().length > 0)
- checkedObjs = new GridLeanIdentitySet<>();
-
- if (checkedObjs != null && !checkedObjs.add(target))
- return false;
-
- boolean injected = false;
-
- for (Field field : descr.recursiveFields()) {
- try {
- Object obj = field.get(target);
-
- if (obj != null) {
- assert checkedObjs != null;
-
- injected |= injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
- }
- }
- catch (IllegalAccessException e) {
- throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
- ", target=" + target + ']', e);
- }
- }
-
- if (annotatedMembers != null) {
- for (GridResourceField field : annotatedMembers.get1()) {
- injector.inject(field, target, depCls, dep);
-
- injected = true;
- }
-
- for (GridResourceMethod mtd : annotatedMembers.get2()) {
- injector.inject(mtd, target, depCls, dep);
-
- injected = true;
- }
- }
-
- return injected;
+ return descr.injectInternal(target, annCls, annotatedMembers, injector, dep, depCls, checkedObjs);
}
/**
@@ -202,36 +195,18 @@ class GridResourceIoc {
}
/**
+ * Checks if annotation is presented on a field or method of the specified object.
+ *
+ * @param target Target object.
+ * @param annSet Annotation classes to find on fields or methods of target object.
* @param dep Deployment.
- * @param target Target.
- * @param annClss Annotations.
- * @return Filtered set of annotations that present in target.
+ * @return {@code true} if any annotation is presented, {@code false} if it's not.
*/
- @SuppressWarnings({"SuspiciousToArrayCall", "unchecked"})
- Class<? extends Annotation>[] filter(
- @Nullable GridDeployment dep, Object target,
- Collection<Class<? extends Annotation>> annClss) {
+ boolean isAnnotationsPresent(@Nullable GridDeployment dep, Object target, AnnotationSet annSet) {
assert target != null;
- assert annClss != null && !annClss.isEmpty();
+ assert annSet != null;
- Class<?> cls = target.getClass();
-
- Class<? extends Annotation>[] res = annCache.get(cls);
-
- if (res == null) {
- Collection<Class<? extends Annotation>> res0 = new ArrayList<>();
-
- for (Class<? extends Annotation> annCls : annClss) {
- if (isAnnotationPresent(target, annCls, dep))
- res0.add(annCls);
- }
-
- res = res0.toArray(new Class[res0.size()]);
-
- annCache.putIfAbsent(cls, res);
- }
-
- return res;
+ return descriptor(dep, target.getClass()).isAnnotated(annSet) != 0;
}
/**
@@ -251,16 +226,18 @@ class GridResourceIoc {
return t2 == null ? GridResourceMethod.EMPTY_ARRAY : t2.get2();
}
- /** {@inheritDoc} */
+ /** Print memory statistics */
public void printMemoryStats() {
X.println(">>> taskMapSize: " + taskMap.size());
- X.println(">>> classDescriptorsCacheSize: " + clsDescs.size());
+
+ Map<Class<?>, ClassDescriptor> map = clsDescs.get();
+ X.println(">>> classDescriptorsCacheSize: " + (map == null ? 0 : map.size()));
}
/**
*
*/
- private static class ClassDescriptor {
+ class ClassDescriptor {
/** */
private final Field[] recursiveFields;
@@ -268,8 +245,18 @@ class GridResourceIoc {
private final Map<Class<? extends Annotation>, T2<GridResourceField[], GridResourceMethod[]>> annMap;
/**
+ * Uses as enum-map with enum {@link AnnotationSet} member as key,
+ * and bitmap as a result of matching found annotations with enum set {@link ResourceAnnotation} as value.
+ */
+ private final int[] containsAnnSets;
+
+ /** Uses as enum-map with enum {@link ResourceAnnotation} member as a keys. */
+ private final T2<GridResourceField[], GridResourceMethod[]>[] annArr;
+
+ /**
* @param cls Class.
*/
+ @SuppressWarnings("unchecked")
ClassDescriptor(Class<?> cls) {
Map<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> annMap
= new HashMap<>();
@@ -335,20 +322,277 @@ class GridResourceIoc {
this.annMap.put(entry.getKey(), new T2<>(fields, mtds));
}
+
+ T2<GridResourceField[], GridResourceMethod[]>[] annArr = null;
+
+ if (annMap.isEmpty())
+ containsAnnSets = null;
+ else {
+ int annotationsBits = 0;
+
+ for (ResourceAnnotation ann : ResourceAnnotation.values()) {
+ T2<GridResourceField[], GridResourceMethod[]> member = annotatedMembers(ann.clazz);
+
+ if (member != null) {
+ if (annArr == null)
+ annArr = new T2[ResourceAnnotation.values().length];
+
+ annArr[ann.ordinal()] = member;
+
+ annotationsBits |= 1 << ann.ordinal();
+ }
+ }
+
+ AnnotationSet[] annotationSets = AnnotationSet.values();
+
+ containsAnnSets = new int[annotationSets.length];
+
+ for (int i = 0; i < annotationSets.length; i++)
+ containsAnnSets[i] = annotationsBits & annotationSets[i].annotationsBitSet;
+ }
+
+ this.annArr = annArr;
}
/**
* @return Recursive fields.
*/
- public Field[] recursiveFields() {
+ Field[] recursiveFields() {
return recursiveFields;
}
/**
+ * @param annCls Annotation class.
* @return Fields.
*/
- @Nullable public T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
+ @Nullable T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
return annMap.get(annCls);
}
+
+ /**
+ * @param set annotation set.
+ * @return {@code Bitmask} > 0 if any annotation is presented, otherwise return 0;
+ */
+ int isAnnotated(AnnotationSet set) {
+ return recursiveFields.length > 0 ? set.annotationsBitSet :
+ (containsAnnSets == null ? 0 : containsAnnSets[set.ordinal()]);
+ }
+
+ /**
+ * @param ann Annotation.
+ * @return {@code True} if annotation is presented.
+ */
+ boolean isAnnotated(ResourceAnnotation ann) {
+ return recursiveFields.length > 0 || (annArr != null && annArr[ann.ordinal()] != null);
+ }
+
+ /**
+ * @param target Target object.
+ * @param annCls Annotation class.
+ * @param annotatedMembers Setter annotation.
+ * @param injector Resource to inject.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @param checkedObjs Set of already inspected objects to avoid indefinite recursion.
+ * @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
+ */
+ boolean injectInternal(Object target,
+ Class<? extends Annotation> annCls,
+ T2<GridResourceField[], GridResourceMethod[]> annotatedMembers,
+ GridResourceInjector injector,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls,
+ @Nullable Set<Object> checkedObjs)
+ throws IgniteCheckedException {
+ if (recursiveFields.length == 0 && annotatedMembers == null)
+ return false;
+
+ if (checkedObjs == null && recursiveFields.length > 0)
+ checkedObjs = new GridLeanIdentitySet<>();
+
+ if (checkedObjs != null && !checkedObjs.add(target))
+ return false;
+
+ boolean injected = false;
+
+ for (Field field : recursiveFields) {
+ try {
+ Object obj = field.get(target);
+
+ if (obj != null) {
+ assert checkedObjs != null;
+
+ ClassDescriptor desc = descriptor(dep, obj.getClass());
+ injected |= desc.injectInternal(obj, annCls, desc.annotatedMembers(annCls),
+ injector, dep, depCls, checkedObjs);
+ }
+ }
+ catch (IllegalAccessException e) {
+ throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
+ ", target=" + target + ']', e);
+ }
+ }
+
+ if (annotatedMembers != null) {
+ for (GridResourceField field : annotatedMembers.get1()) {
+ injector.inject(field, target, depCls, dep);
+
+ injected = true;
+ }
+
+ for (GridResourceMethod mtd : annotatedMembers.get2()) {
+ injector.inject(mtd, target, depCls, dep);
+
+ injected = true;
+ }
+ }
+
+ return injected;
+ }
+
+ /**
+ * @param target Target object.
+ * @param ann Setter annotation.
+ * @param injector Resource to inject.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @return {@code True} if resource was injected.
+ * @throws IgniteCheckedException Thrown in case of any errors during injection.
+ */
+ public boolean inject(Object target,
+ ResourceAnnotation ann,
+ GridResourceInjector injector,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls)
+ throws IgniteCheckedException {
+ return injectInternal(target,
+ ann.clazz,
+ annArr == null ? null : annArr[ann.ordinal()],
+ injector,
+ dep,
+ depCls,
+ null);
+ }
+ }
+
+ /**
+ *
+ */
+ enum ResourceAnnotation {
+ /** */
+ CACHE_NAME(CacheNameResource.class),
+
+ /** */
+ SPRING_APPLICATION_CONTEXT(SpringApplicationContextResource.class),
+
+ /** */
+ SPRING(SpringResource.class),
+
+ /** */
+ IGNITE_INSTANCE(IgniteInstanceResource.class),
+
+ /** */
+ LOGGER(LoggerResource.class),
+
+ /** */
+ SERVICE(ServiceResource.class),
+
+ /** */
+ TASK_SESSION(TaskSessionResource.class),
+
+ /** */
+ LOAD_BALANCER(LoadBalancerResource.class),
+
+ /** */
+ TASK_CONTINUOUS_MAPPER(TaskContinuousMapperResource.class),
+
+ /** */
+ JOB_CONTEXT(JobContextResource.class),
+
+ /** */
+ CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+
+ /** */
+ public final Class<? extends Annotation> clazz;
+
+ /**
+ * @param clazz annotation class.
+ */
+ ResourceAnnotation(Class<? extends Annotation> clazz) {
+ this.clazz = clazz;
+ }
+ }
+
+ /**
+ *
+ */
+ public enum AnnotationSet {
+ /** */
+ GENERIC(
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ ENTRY_PROCESSOR(
+ ResourceAnnotation.CACHE_NAME,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ TASK(
+ ResourceAnnotation.TASK_SESSION,
+ ResourceAnnotation.LOAD_BALANCER,
+ ResourceAnnotation.TASK_CONTINUOUS_MAPPER,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ ),
+
+ /** */
+ JOB(
+ ResourceAnnotation.TASK_SESSION,
+ ResourceAnnotation.JOB_CONTEXT,
+
+ ResourceAnnotation.SPRING_APPLICATION_CONTEXT,
+ ResourceAnnotation.SPRING,
+ ResourceAnnotation.IGNITE_INSTANCE,
+ ResourceAnnotation.LOGGER,
+ ResourceAnnotation.SERVICE
+ );
+
+ /** Resource annotations bits for fast checks. */
+ public final int annotationsBitSet;
+
+ /** Holds annotations in order */
+ public final ResourceAnnotation[] annotations;
+
+ /**
+ * @param annotations ResourceAnnotations.
+ */
+ AnnotationSet(ResourceAnnotation... annotations) {
+ assert annotations.length < 32 : annotations.length;
+
+ this.annotations = annotations;
+
+ int mask = 0;
+
+ for (ResourceAnnotation ann : annotations)
+ mask |= 1 << ann.ordinal();
+
+ annotationsBitSet = mask;
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index afe0ef1..84d07b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -20,12 +20,11 @@ package org.apache.ignite.internal.processors.resource;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Arrays;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeLoadBalancer;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskContinuousMapper;
@@ -34,22 +33,10 @@ import org.apache.ignite.internal.GridInternalWrapper;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lifecycle.LifecycleBean;
-import org.apache.ignite.resources.CacheNameResource;
-import org.apache.ignite.resources.CacheStoreSessionResource;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.JobContextResource;
-import org.apache.ignite.resources.LoadBalancerResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.resources.ServiceResource;
-import org.apache.ignite.resources.SpringApplicationContextResource;
-import org.apache.ignite.resources.SpringResource;
-import org.apache.ignite.resources.TaskContinuousMapperResource;
-import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.spi.IgniteSpi;
import org.jetbrains.annotations.Nullable;
@@ -58,42 +45,6 @@ import org.jetbrains.annotations.Nullable;
* Processor for all Ignite and task/job resources.
*/
public class GridResourceProcessor extends GridProcessorAdapter {
- /** */
- private static final Collection<Class<? extends Annotation>> JOB_INJECTIONS = Arrays.asList(
- TaskSessionResource.class,
- JobContextResource.class,
- IgniteInstanceResource.class,
- SpringApplicationContextResource.class,
- SpringResource.class,
- LoggerResource.class,
- ServiceResource.class);
-
- /** */
- private static final Collection<Class<? extends Annotation>> TASK_INJECTIONS = Arrays.asList(
- TaskSessionResource.class,
- LoadBalancerResource.class,
- TaskContinuousMapperResource.class,
- IgniteInstanceResource.class,
- SpringApplicationContextResource.class,
- SpringResource.class,
- LoggerResource.class,
- ServiceResource.class);
-
- /** Grid instance injector. */
- private GridResourceBasicInjector<IgniteEx> gridInjector;
-
- /** Spring application context injector. */
- private GridResourceInjector springCtxInjector;
-
- /** Logger injector. */
- private GridResourceBasicInjector<IgniteLogger> logInjector;
-
- /** Services injector. */
- private GridResourceBasicInjector<Collection<Service>> srvcInjector;
-
- /** Spring bean resources injector. */
- private GridResourceInjector springBeanInjector;
-
/** Cleaning injector. */
private final GridResourceInjector nullInjector = new GridResourceBasicInjector<>(null);
@@ -103,6 +54,9 @@ public class GridResourceProcessor extends GridProcessorAdapter {
/** */
private final GridResourceIoc ioc = new GridResourceIoc();
+ /** */
+ private final GridResourceInjector[] injectorByAnnotation;
+
/**
* Creates resources processor.
*
@@ -111,9 +65,14 @@ public class GridResourceProcessor extends GridProcessorAdapter {
public GridResourceProcessor(GridKernalContext ctx) {
super(ctx);
- gridInjector = new GridResourceBasicInjector<>(ctx.grid());
- logInjector = new GridResourceLoggerInjector(ctx.config().getGridLogger());
- srvcInjector = new GridResourceServiceInjector(ctx.grid());
+ injectorByAnnotation = new GridResourceInjector[GridResourceIoc.ResourceAnnotation.values().length];
+
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SERVICE.ordinal()] =
+ new GridResourceServiceInjector(ctx.grid());
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.LOGGER.ordinal()] =
+ new GridResourceLoggerInjector(ctx.config().getGridLogger());
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
+ new GridResourceBasicInjector<>(ctx.grid());
}
/** {@inheritDoc} */
@@ -138,8 +97,12 @@ public class GridResourceProcessor extends GridProcessorAdapter {
public void setSpringContext(@Nullable GridSpringResourceContext rsrcCtx) {
this.rsrcCtx = rsrcCtx;
- springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector;
- springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector;
+ GridResourceInjector springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector;
+ GridResourceInjector springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector;
+
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING.ordinal()] = springBeanInjector;
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING_APPLICATION_CONTEXT.ordinal()] =
+ springCtxInjector;
}
/**
@@ -187,17 +150,15 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void inject(GridDeployment dep, Class<?> depCls, Object target) throws IgniteCheckedException {
+ assert target != null;
+
if (log.isDebugEnabled())
log.debug("Injecting resources: " + target);
// Unwrap Proxy object.
target = unwrapTarget(target);
- ioc.inject(target, IgniteInstanceResource.class, gridInjector, dep, depCls);
- ioc.inject(target, SpringApplicationContextResource.class, springCtxInjector, dep, depCls);
- ioc.inject(target, SpringResource.class, springBeanInjector, dep, depCls);
- ioc.inject(target, LoggerResource.class, logInjector, dep, depCls);
- ioc.inject(target, ServiceResource.class, srvcInjector, dep, depCls);
+ inject(target, GridResourceIoc.AnnotationSet.GENERIC, dep, depCls);
}
/**
@@ -216,7 +177,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- ioc.inject(obj, CacheNameResource.class, new GridResourceBasicInjector<>(cacheName), null, null);
+ inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_NAME, null, null, cacheName);
}
/**
@@ -236,7 +197,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- return ioc.inject(obj, CacheStoreSessionResource.class, new GridResourceBasicInjector<>(ses), null, null);
+ return inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_STORE_SESSION, null, null, ses);
}
/**
@@ -244,6 +205,17 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed to inject.
*/
public void injectGeneric(Object obj) throws IgniteCheckedException {
+ inject(obj, GridResourceIoc.AnnotationSet.GENERIC);
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @param annSet Supported annotations.
+ * @param params Parameters.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ public void inject(Object obj, GridResourceIoc.AnnotationSet annSet, Object... params)
+ throws IgniteCheckedException {
assert obj != null;
if (log.isDebugEnabled())
@@ -252,33 +224,126 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
obj = unwrapTarget(obj);
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ inject(obj, annSet, null, null, params);
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @param annSet Supported annotations.
+ * @param dep Deployment.
+ * @param depCls Deployment class.
+ * @param params Parameters.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ private void inject(Object obj,
+ GridResourceIoc.AnnotationSet annSet,
+ @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls,
+ Object... params)
+ throws IgniteCheckedException {
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(annSet) == 0)
+ return;
+
+ int i = 0;
+ for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) {
+ if (clsDesc.isAnnotated(ann)) {
+ final GridResourceInjector injector = injectorByAnnotation(ann, i < params.length ? params[i] : null);
+
+ if (injector != null)
+ clsDesc.inject(obj, ann, injector, dep, depCls);
+ }
+
+ i++;
+ }
}
/**
* @param obj Object.
+ * @param annSet Supported annotations.
* @throws IgniteCheckedException If failed.
*/
- public void cleanupGeneric(Object obj) throws IgniteCheckedException {
- if (obj != null) {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + obj);
-
- // Unwrap Proxy object.
- obj = unwrapTarget(obj);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ private void cleanup(Object obj, GridResourceIoc.AnnotationSet annSet)
+ throws IgniteCheckedException {
+ assert obj != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Cleaning up resources: " + obj);
+
+ // Unwrap Proxy object.
+ obj = unwrapTarget(obj);
+
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(annSet) == 0)
+ return;
+
+ for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations)
+ clsDesc.inject(obj, ann, nullInjector, null, null);
+ }
+
+ /**
+ * @param ann Annotation.
+ * @param param Injector parameter.
+ * @return Injector.
+ */
+ private GridResourceInjector injectorByAnnotation(GridResourceIoc.ResourceAnnotation ann, Object param) {
+ final GridResourceInjector res;
+
+ switch (ann) {
+ case CACHE_NAME:
+ case TASK_SESSION:
+ case LOAD_BALANCER:
+ case TASK_CONTINUOUS_MAPPER:
+ case CACHE_STORE_SESSION:
+ res = new GridResourceBasicInjector<>(param);
+ break;
+
+ case JOB_CONTEXT:
+ res = new GridResourceJobContextInjector((ComputeJobContext)param);
+ break;
+
+ default:
+ res = injectorByAnnotation[ann.ordinal()];
+ break;
}
+
+ return res;
+ }
+
+ /**
+ * @param obj Object to inject.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep,
+ @Nullable Class<?> depCls, Object param)
+ throws IgniteCheckedException {
+ GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass());
+
+ assert clsDesc != null;
+
+ if (clsDesc.isAnnotated(ann)) {
+ GridResourceInjector injector = injectorByAnnotation(ann, param);
+
+ if (injector != null)
+ return clsDesc.inject(obj, ann, injector, dep, depCls);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param obj Object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void cleanupGeneric(Object obj) throws IgniteCheckedException {
+ if (obj != null)
+ cleanup(obj, GridResourceIoc.AnnotationSet.GENERIC);
}
/**
@@ -321,30 +386,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
*/
private void injectToJob(GridDeployment dep, Class<?> taskCls, Object job, ComputeTaskSession ses,
GridJobContextImpl jobCtx) throws IgniteCheckedException {
- Class<? extends Annotation>[] filtered = ioc.filter(dep, job, JOB_INJECTIONS);
-
- if (filtered.length > 0) {
- for (Class<? extends Annotation> annCls : filtered) {
- if (annCls == TaskSessionResource.class)
- injectBasicResource(job, TaskSessionResource.class, ses, dep, taskCls);
- else if (annCls == JobContextResource.class)
- ioc.inject(job, JobContextResource.class, new GridResourceJobContextInjector(jobCtx),
- dep, taskCls);
- else if (annCls == IgniteInstanceResource.class)
- ioc.inject(job, IgniteInstanceResource.class, gridInjector, dep, taskCls);
- else if (annCls == SpringApplicationContextResource.class)
- ioc.inject(job, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls);
- else if (annCls == SpringResource.class)
- ioc.inject(job, SpringResource.class, springBeanInjector, dep, taskCls);
- else if (annCls == LoggerResource.class)
- ioc.inject(job, LoggerResource.class, logInjector, dep, taskCls);
- else {
- assert annCls == ServiceResource.class;
-
- ioc.inject(job, ServiceResource.class, srvcInjector, dep, taskCls);
- }
- }
- }
+
+ inject(job, GridResourceIoc.AnnotationSet.JOB, dep, taskCls, ses, jobCtx);
}
/**
@@ -365,34 +408,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
// Unwrap Proxy object.
Object obj = unwrapTarget(task);
- Class<? extends Annotation>[] filtered = ioc.filter(dep, obj, TASK_INJECTIONS);
-
- if (filtered.length == 0)
- return;
-
- Class<?> taskCls = obj.getClass();
-
- for (Class<? extends Annotation> annCls : filtered) {
- if (annCls == TaskSessionResource.class)
- injectBasicResource(obj, TaskSessionResource.class, ses, dep, taskCls);
- else if (annCls == LoadBalancerResource.class)
- injectBasicResource(obj, LoadBalancerResource.class, balancer, dep, taskCls);
- else if (annCls == TaskContinuousMapperResource.class)
- injectBasicResource(obj, TaskContinuousMapperResource.class, mapper, dep, taskCls);
- else if (annCls == IgniteInstanceResource.class)
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, dep, taskCls);
- else if (annCls == SpringApplicationContextResource.class)
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls);
- else if (annCls == SpringResource.class)
- ioc.inject(obj, SpringResource.class, springBeanInjector, dep, taskCls);
- else if (annCls == LoggerResource.class)
- ioc.inject(obj, LoggerResource.class, logInjector, dep, taskCls);
- else {
- assert annCls == ServiceResource.class;
-
- ioc.inject(obj, ServiceResource.class, srvcInjector, dep, taskCls);
- }
- }
+ inject(obj, GridResourceIoc.AnnotationSet.TASK, dep, null, ses, balancer, mapper);
}
/**
@@ -408,24 +424,25 @@ public class GridResourceProcessor extends GridProcessorAdapter {
}
/**
+ * Checks if annotations presents in specified object.
+ *
+ * @param dep Class deployment.
+ * @param target Object to check.
+ * @param annSet Annotations to find.
+ * @return {@code true} if any annotation is presented, {@code false} if it's not.
+ */
+ public boolean isAnnotationsPresent(GridDeployment dep, Object target, GridResourceIoc.AnnotationSet annSet) {
+ return ioc.isAnnotationsPresent(dep, target, annSet);
+ }
+
+ /**
* Injects held resources into given SPI implementation.
*
* @param spi SPI implementation.
* @throws IgniteCheckedException Throw in case of any errors.
*/
public void inject(IgniteSpi spi) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + spi);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(spi);
-
- // Caching key is null for the SPIs.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
+ injectGeneric(spi);
}
/**
@@ -436,17 +453,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(IgniteSpi spi) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + spi);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(spi);
-
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ cleanupGeneric(spi);
}
/**
@@ -456,18 +463,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void inject(LifecycleBean lifecycleBean) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + lifecycleBean);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(lifecycleBean);
-
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ injectGeneric(lifecycleBean);
}
/**
@@ -478,18 +474,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(LifecycleBean lifecycleBean) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + lifecycleBean);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(lifecycleBean);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
+ cleanupGeneric(lifecycleBean);
}
/**
@@ -499,18 +484,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public void inject(Service svc) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Injecting resources: " + svc);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(svc);
-
- // No deployment for lifecycle beans.
- ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null);
- ioc.inject(obj, SpringResource.class, springBeanInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null);
- ioc.inject(obj, LoggerResource.class, logInjector, null, null);
- ioc.inject(obj, ServiceResource.class, srvcInjector, null, null);
+ injectGeneric(svc);
}
/**
@@ -521,39 +495,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void cleanup(Service svc) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Cleaning up resources: " + svc);
-
- // Unwrap Proxy object.
- Object obj = unwrapTarget(svc);
-
- // Caching key is null for the life-cycle beans.
- ioc.inject(obj, LoggerResource.class, nullInjector, null, null);
- ioc.inject(obj, ServiceResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null);
- ioc.inject(obj, SpringResource.class, nullInjector, null, null);
- ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null);
- }
-
- /**
- * This method is declared public as it is used from tests as well.
- * Note, that this method can be used only with unwrapped objects
- * (see {@link #unwrapTarget(Object)}).
- *
- * @param target Target object.
- * @param annCls Setter annotation.
- * @param rsrc Resource to inject.
- * @param dep Deployment.
- * @param depCls Deployed class.
- * @throws IgniteCheckedException If injection failed.
- */
- public void injectBasicResource(Object target, Class<? extends Annotation> annCls, Object rsrc,
- GridDeployment dep, Class<?> depCls) throws IgniteCheckedException {
- // Safety.
- assert !(rsrc instanceof GridResourceInjector) : "Invalid injection.";
-
- // Basic injection don't cache anything. Use null as a key.
- ioc.inject(target, annCls, new GridResourceBasicInjector<>(rsrc), dep, depCls);
+ cleanupGeneric(svc);
}
/**
@@ -602,4 +544,4 @@ public class GridResourceProcessor extends GridProcessorAdapter {
ioc.printMemoryStats();
}
-}
\ No newline at end of file
+}
[7/9] ignite git commit: Merge remote-tracking branch
'ignite-gg/ignite-1.6.7' into ignite-1.7.2
Posted by ak...@apache.org.
Merge remote-tracking branch 'ignite-gg/ignite-1.6.7' into ignite-1.7.2
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
# modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
# modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a259da2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a259da2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a259da2
Branch: refs/heads/master
Commit: 4a259da296946740e180d25559e5cacba60a97ad
Parents: fea3eea 1ef150e
Author: EdShangGG <es...@gridgain.com>
Authored: Tue Aug 30 14:39:13 2016 +0300
Committer: EdShangGG <es...@gridgain.com>
Committed: Tue Aug 30 14:39:13 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 15 +
.../processors/cache/CacheLazyEntry.java | 2 +
.../EntryProcessorResourceInjectorProxy.java | 105 ++++
.../processors/cache/GridCacheMapEntry.java | 13 +-
.../processors/cache/IgniteCacheProxy.java | 112 +++-
.../GridNearAtomicSingleUpdateFuture.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 18 +-
.../processors/cache/query/CacheQuery.java | 11 +-
.../query/GridCacheDistributedQueryManager.java | 22 +-
.../cache/query/GridCacheLocalQueryManager.java | 3 +-
.../cache/query/GridCacheQueryAdapter.java | 69 ++-
.../cache/query/GridCacheQueryBean.java | 8 +-
.../cache/query/GridCacheQueryInfo.java | 8 +-
.../cache/query/GridCacheQueryManager.java | 125 ++--
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../transactions/IgniteTxLocalAdapter.java | 5 +-
.../processors/odbc/escape/OdbcEscapeUtils.java | 115 ++--
.../processors/resource/GridResourceIoc.java | 438 ++++++++++----
.../resource/GridResourceProcessor.java | 396 ++++++-------
.../cache/GridCacheAbstractFullApiSelfTest.java | 393 +++++++++++--
.../cache/GridCacheAbstractSelfTest.java | 140 ++++-
.../GridCacheTransformEventSelfTest.java | 66 ++-
...ePartitionedBasicStoreMultiNodeSelfTest.java | 2 +
.../GridCacheQueryTransformerSelfTest.java | 570 +++++++++++++++++++
.../odbc/OdbcEscapeSequenceSelfTest.java | 164 +++++-
.../multijvm/IgniteCacheProcessProxy.java | 6 +
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../GridTransformSpringInjectionSelfTest.java | 186 ++++++
.../testsuites/IgniteSpringTestSuite.java | 7 +-
.../commands/cache/VisorCacheStopCommand.scala | 5 +-
.../IgniteInvokeWithInjectionBenchmark.java | 74 +++
.../IgniteInvokeWithInjectionTxBenchmark.java | 30 +
33 files changed, 2564 insertions(+), 577 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a259da2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a259da2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 9935f93,454ce04..4b4275f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -1439,12 -1478,9 +1436,12 @@@ public abstract class GridCacheQueryMan
res = loc ?
executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
+ recipient(qryInfo.senderId(), qryInfo.requestId())) :
queryResult(qryInfo, taskName);
+ if (res == null)
+ return;
+
iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
type = res.type();
@@@ -1793,7 -1848,8 +1809,8 @@@
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
- @Nullable private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
- private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
++ @Nullable private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
+ String taskName) throws IgniteCheckedException {
assert qryInfo != null;
final UUID sndId = qryInfo.senderId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a259da2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a259da2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 0203354,3652acd..4198535
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@@ -126,18 -116,7 +127,19 @@@ public class IgniteCacheQuerySelfTestSu
suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class);
suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class);
+ suite.addTestSuite(GridCacheQueryTransformerSelfTest.class);
+ suite.addTestSuite(IgniteCachePrimitiveFieldsQuerySelfTest.class);
+
+ suite.addTestSuite(IgniteCacheJoinQueryWithAffinityKeyTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinCollocatedAndNotTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinTest.class);
+ suite.addTestSuite(IgniteCacheJoinPartitionedAndReplicatedTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class);
+ suite.addTestSuite(IgniteCrossCachesJoinsQueryTest.class);
+ suite.addTestSuite(IgniteCacheCrossCacheJoinRandomTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class);
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a259da2/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
----------------------------------------------------------------------
diff --cc modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 3b70556,67b117d..587d46b
--- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@@ -18,7 -18,8 +18,9 @@@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+ import org.apache.ignite.cache.spring.GridSpringCacheManagerSelfTest;
+ import org.apache.ignite.cache.spring.SpringCacheManagerContextInjectionTest;
+import org.apache.ignite.internal.IgniteSpringBeanTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactorySelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactorySelfTest;
import org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListenerSelfTest;
[2/9] ignite git commit: ignite-2560 Support resource injection for
entry processor, optimizations for resource injection.
Posted by ak...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 32d46e2..3f4d812 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,7 +32,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
@@ -46,10 +47,13 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
@@ -61,12 +65,15 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.lang.IgnitePair;
@@ -76,10 +83,16 @@ import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.CacheNameResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.ServiceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
@@ -124,6 +137,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/** Test timeout */
private static final long TEST_TIMEOUT = 60 * 1000;
+ /** Service name. */
+ private static final String SERVICE_NAME1 = "testService1";
+
/** */
public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
new CacheEntryProcessor<String, Integer, String>() {
@@ -202,6 +218,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES)
cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+ int[] evtTypes = cfg.getIncludeEventTypes();
+
+ if (evtTypes == null || evtTypes.length == 0)
+ cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_READ);
+ else {
+ for (int evtType : evtTypes) {
+ if (evtType == EventType.EVT_CACHE_OBJECT_READ)
+ return cfg;
+ }
+
+ int[] updatedEvtTypes = Arrays.copyOf(evtTypes, evtTypes.length + 1);
+
+ updatedEvtTypes[updatedEvtTypes.length - 1] = EventType.EVT_CACHE_OBJECT_READ;
+ }
+
return cfg;
}
@@ -261,6 +292,18 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cacheCfgMap = null;
}
+ // We won't deploy service unless non-client node is configured.
+ for (int i = 0; i < gridCount(); i++) {
+ Boolean clientMode = grid(i).configuration().isClientMode();
+
+ if (clientMode)
+ continue;
+
+ grid(0).services(grid(0).cluster()).deployNodeSingleton(SERVICE_NAME1, new DummyServiceImpl());
+
+ break;
+ }
+
for (int i = 0; i < gridCount(); i++)
info("Grid " + i + ": " + grid(i).localNode().id());
}
@@ -619,9 +662,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put("key1", 1);
cache.put("key2", 2);
- CacheEntry<String, Integer> key1e = cache.getEntry("key1");
- CacheEntry<String, Integer> key2e = cache.getEntry("key2");
- CacheEntry<String, Integer> wrongKeye = cache.getEntry("wrongKey");
+ CacheEntry<String, Integer> key1e = cache.getEntry("key1");
+ CacheEntry<String, Integer> key2e = cache.getEntry("key2");
+ CacheEntry<String, Integer> wrongKeye = cache.getEntry("wrongKey");
assert key1e.getValue() == 1;
assert key1e.getKey().equals("key1");
@@ -781,7 +824,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
boolean b1 = false;
boolean b2 = false;
- for (CacheEntry<String, Integer> e: c1){
+ for (CacheEntry<String, Integer> e : c1) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
@@ -800,7 +843,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
b1 = false;
b2 = false;
- for (CacheEntry<String, Integer> e: c2){
+ for (CacheEntry<String, Integer> e : c2) {
if (e.getKey().equals("key1") && e.getValue().equals(1))
b1 = true;
@@ -1481,8 +1524,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
private void checkTransformReturnValue(boolean put,
TransactionConcurrency concurrency,
TransactionIsolation isolation)
- throws Exception
- {
+ throws Exception {
IgniteCache<String, Integer> cache = jcache();
if (!put)
@@ -1790,7 +1832,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put(key, 1);
- assertEquals(1, (int) cache.get(key));
+ assertEquals(1, (int)cache.get(key));
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -1808,7 +1850,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}, NullPointerException.class, null);
- assertEquals(1, (int) cache.get(key));
+ assertEquals(1, (int)cache.get(key));
cache.put(key, 2);
@@ -1839,7 +1881,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("k1"));
assertNull(cache.get("k2"));
- assertEquals(2, (int) cache.get(key));
+ assertEquals(2, (int)cache.get(key));
cache.put(key, 3);
@@ -1890,7 +1932,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.putAll(m);
- assertEquals(3, (int) cache.get("key3"));
+ assertEquals(3, (int)cache.get("key3"));
assertEquals(4, (int)cache.get("key4"));
}
@@ -2215,7 +2257,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
- * @param inTx In tx flag.
+ * @param inTx In tx flag.
* @throws Exception If failed.
*/
private void checkPutxIfAbsentAsync(boolean inTx) throws Exception {
@@ -2857,7 +2899,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @return Count of entries to be removed in removeAll() test.
*/
- protected long hugeRemoveAllEntryCount(){
+ protected long hugeRemoveAllEntryCount() {
return 1000L;
}
@@ -3627,7 +3669,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
- assertEquals(ttl, (long) curEntryTtl.get1());
+ assertEquals(ttl, (long)curEntryTtl.get1());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
@@ -3656,7 +3698,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
- assertEquals(ttl, (long) curEntryTtl.get1());
+ assertEquals(ttl, (long)curEntryTtl.get1());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
@@ -3685,7 +3727,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNotNull(curEntryTtl.get1());
assertNotNull(curEntryTtl.get2());
- assertEquals(ttl, (long) curEntryTtl.get1());
+ assertEquals(ttl, (long)curEntryTtl.get1());
assertTrue(curEntryTtl.get2() > startTime);
expireTimes[i] = curEntryTtl.get2();
@@ -3897,7 +3939,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.localPromote(Collections.singleton(k2));
- assertEquals((Integer) 2, cache.localPeek(k2, ONHEAP_PEEK_MODES));
+ assertEquals((Integer)2, cache.localPeek(k2, ONHEAP_PEEK_MODES));
cnt++;
}
@@ -5021,7 +5063,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @param keys Keys list.
* @param txConcurrency Concurrency mode.
* @param txIsolation Isolation mode.
- *
* @throws Exception If failed.
*/
private void checkSkipStoreWithTransaction(IgniteCache<String, Integer> cache,
@@ -5030,8 +5071,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
List<String> keys,
TransactionConcurrency txConcurrency,
TransactionIsolation txIsolation)
- throws Exception
- {
+ throws Exception {
info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
cache.removeAll(data.keySet());
@@ -5043,10 +5083,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
// Several put check.
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
- for (String key: keys)
+ for (String key : keys)
cacheSkipStore.put(key, val);
- for (String key: keys) {
+ for (String key : keys) {
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5055,7 +5095,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
tx.commit();
}
- for (String key: keys) {
+ for (String key : keys) {
assertEquals(val, cacheSkipStore.get(key));
assertEquals(val, cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5070,7 +5110,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
tx.commit();
}
- for (String key: keys) {
+ for (String key : keys) {
val = data.get(key);
assertEquals(val, cacheSkipStore.get(key));
@@ -5086,7 +5126,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
tx.commit();
}
- for (String key: keys) {
+ for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertTrue(storeStgy.isInStore(key));
@@ -5100,7 +5140,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
cache.putAll(data);
- for (String key: keys) {
+ for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5108,7 +5148,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.removeAll(data.keySet());
- for (String key: keys) {
+ for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5135,7 +5175,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.putAll(subMap);
- for (String key: keys) {
+ for (String key : keys) {
assertNotNull(cacheSkipStore.get(key));
assertNotNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5162,7 +5202,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.removeAll(data.keySet());
- for (String key: keys) {
+ for (String key : keys) {
assertNull(cacheSkipStore.get(key));
assertNull(cache.get(key));
assertFalse(storeStgy.isInStore(key));
@@ -5257,7 +5297,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @param cache Cache instance.
* @param cacheSkipStore Cache skip store projection.
- *
* @throws Exception If failed.
*/
private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore)
@@ -5426,6 +5465,155 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
+ * @throws Exception If failed.
+ */
+ public void testTransformResourceInjection() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+ Ignite ignite = ignite(0);
+
+ doTransformResourceInjection(ignite, cache);
+ doTransformResourceInjection(ignite, cache.withAsync());
+
+ if (txEnabled()) {
+ doTransformResourceInjectionInTx(ignite, cache);
+ doTransformResourceInjectionInTx(ignite, cache.withAsync());
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void doTransformResourceInjectionInTx(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ IgniteTransactions txs = ignite.transactions();
+
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ doTransformResourceInjection(ignite, cache);
+
+ tx.commit();
+ }
+ }
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void doTransformResourceInjection(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+ final Collection<ResourceType> required = Arrays.asList(ResourceType.IGNITE_INSTANCE,
+ ResourceType.CACHE_NAME,
+ ResourceType.LOGGER,
+ ResourceType.SERVICE);
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ IgniteEvents evts = ignite.events(ignite.cluster());
+
+ UUID opId = evts.remoteListen(lsnr, null, EventType.EVT_CACHE_OBJECT_READ);
+
+ try {
+ checkResourceInjectionOnInvoke(cache, required);
+
+ checkResourceInjectionOnInvokeAll(cache, required);
+
+ checkResourceInjectionOnInvokeAllMap(cache, required);
+ }
+ finally {
+ evts.stopRemoteListen(opId);
+ }
+ }
+
+ /**
+ * Tests invokeAll method for map of pairs (key, entryProcessor).
+ *
+ * @param cache Cache.
+ * @param required Expected injected resources.
+ */
+ private void checkResourceInjectionOnInvokeAllMap(IgniteCache<String, Integer> cache,
+ Collection<ResourceType> required) {
+ Map<String, EntryProcessorResult<Integer>> results;
+
+ Map<String, EntryProcessor<String, Integer, Integer>> map = new HashMap<>();
+
+ map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
+ map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
+ map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
+ map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
+
+ results = cache.invokeAll(map);
+
+ if (cache.isAsync())
+ results = cache.<Map<String, EntryProcessorResult<Integer>>>future().get();
+
+ assertEquals(map.size(), results.size());
+
+ for (EntryProcessorResult<Integer> res : results.values()) {
+ Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(res.get()).notInjected(required);
+
+ if (!notInjected.isEmpty())
+ fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
+ }
+ }
+
+ /**
+ * Tests invokeAll method for set of keys.
+ *
+ * @param cache Cache.
+ * @param required Expected injected resources.
+ */
+ private void checkResourceInjectionOnInvokeAll(IgniteCache<String, Integer> cache,
+ Collection<ResourceType> required) {
+ Set<String> keys = new HashSet<>(Arrays.asList(UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString()));
+
+ Map<String, EntryProcessorResult<Integer>> results = cache.invokeAll(keys,
+ new ResourceInjectionEntryProcessor());
+
+ if (cache.isAsync())
+ results = cache.<Map<String, EntryProcessorResult<Integer>>>future().get();
+
+ assertEquals(keys.size(), results.size());
+
+ for (EntryProcessorResult<Integer> res : results.values()) {
+ Collection<ResourceType> notInjected1 = ResourceInfoSet.valueOf(res.get()).notInjected(required);
+
+ if (!notInjected1.isEmpty())
+ fail("Can't inject resource(s): " + Arrays.toString(notInjected1.toArray()));
+ }
+ }
+
+ /**
+ * Tests invoke for single key.
+ *
+ * @param cache Cache.
+ * @param required Expected injected resources.
+ */
+ private void checkResourceInjectionOnInvoke(IgniteCache<String, Integer> cache,
+ Collection<ResourceType> required) {
+
+ String key = UUID.randomUUID().toString();
+
+ Integer flags = cache.invoke(key, new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor());
+
+ if (cache.isAsync())
+ flags = cache.<Integer>future().get();
+
+ assertTrue("Processor result is null", flags != null);
+
+ Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(flags).notInjected(required);
+
+ if (!notInjected.isEmpty())
+ fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
+ }
+
+ /**
* Sets given value, returns old value.
*/
public static final class SetValueProcessor implements EntryProcessor<String, Integer, Integer> {
@@ -5440,7 +5628,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/** {@inheritDoc} */
- @Override public Integer process(MutableEntry<String, Integer> entry, Object... arguments) throws EntryProcessorException {
+ @Override public Integer process(MutableEntry<String, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
Integer val = entry.getValue();
entry.setValue(newVal);
@@ -5498,6 +5687,86 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
*
*/
+ public static class ResourceInjectionEntryProcessor extends ResourceInjectionEntryProcessorBase<String, Integer> {
+ /** */
+ protected transient Ignite ignite;
+
+ /** */
+ protected transient String cacheName;
+
+ /** */
+ protected transient IgniteLogger log;
+
+ /** */
+ protected transient DummyService svc;
+
+ /**
+ * @param ignite Ignite.
+ */
+ @IgniteInstanceResource
+ public void setIgnite(Ignite ignite) {
+ assert ignite != null;
+
+ checkSet();
+
+ infoSet.set(ResourceType.IGNITE_INSTANCE, true);
+
+ this.ignite = ignite;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ @CacheNameResource
+ public void setCacheName(String cacheName) {
+ checkSet();
+
+ infoSet.set(ResourceType.CACHE_NAME, true);
+
+ this.cacheName = cacheName;
+ }
+
+ /**
+ * @param log Logger.
+ */
+ @LoggerResource
+ public void setLoggerResource(IgniteLogger log) {
+ assert log != null;
+
+ checkSet();
+
+ infoSet.set(ResourceType.LOGGER, true);
+
+ this.log = log;
+ }
+
+ /**
+ * @param svc Service.
+ */
+ @ServiceResource(serviceName = SERVICE_NAME1)
+ public void setDummyService(DummyService svc) {
+ assert svc != null;
+
+ checkSet();
+
+ infoSet.set(ResourceType.SERVICE, true);
+
+ this.svc = svc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ Integer oldVal = e.getValue();
+
+ e.setValue(ThreadLocalRandom.current().nextInt() + (oldVal == null ? 0 : oldVal));
+
+ return super.process(e, args);
+ }
+ }
+
+ /**
+ *
+ */
private static class CheckEntriesTask extends TestIgniteIdxRunnable {
/** Keys. */
private final Collection<String> keys;
@@ -5674,6 +5943,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*
*/
private static class SwapEvtsLocalListener implements IgnitePredicate<Event> {
+ /** */
@LoggerResource
private IgniteLogger log;
@@ -5711,13 +5981,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}
+ /**
+ *
+ */
private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
+ /** */
private final int cnt;
+ /**
+ * @param cnt Keys count.
+ */
public CheckEntriesDeletedTask(int cnt) {
this.cnt = cnt;
}
+ /** {@inheritDoc} */
@Override public void run(int idx) throws Exception {
for (int i = 0; i < cnt; i++) {
String key = String.valueOf(i);
@@ -5816,4 +6094,64 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
return val;
}
}
+
+ /**
+ * Dummy Service.
+ */
+ public interface DummyService {
+ /**
+ *
+ */
+ public void noop();
+ }
+
+ /**
+ * No-op test service.
+ */
+ public static class DummyServiceImpl implements DummyService, Service {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void noop() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ System.out.println("Cancelling service: " + ctx.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ System.out.println("Initializing service: " + ctx.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) {
+ System.out.println("Executing service: " + ctx.name());
+ }
+ }
+
+ /**
+ *
+ */
+ public static class CacheEventListener implements IgniteBiPredicate<UUID, CacheEvent>, IgnitePredicate<CacheEvent> {
+ /** */
+ public final LinkedBlockingQueue<CacheEvent> evts = new LinkedBlockingQueue<>();
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ evts.add(evt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheEvent evt) {
+ evts.add(evt);
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index d58e560..af31635 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -17,9 +17,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -80,6 +86,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
int cnt = gridCount();
assert cnt >= 1 : "At least one grid must be started";
@@ -188,7 +196,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
assert jcache().unwrap(Ignite.class).transactions().tx() == null;
assertEquals("Cache is not empty", 0, jcache().localSize(CachePeekMode.ALL));
- storeStgy.resetStore();
+ if (storeStgy != null)
+ storeStgy.resetStore();
}
/** {@inheritDoc} */
@@ -410,20 +419,20 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
*/
protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilter =
new P1<Cache.Entry<String, Integer>>() {
- @Override public boolean apply(Cache.Entry<String, Integer> entry) {
- return entry.getKey().contains("key");
- }
- };
+ @Override public boolean apply(Cache.Entry<String, Integer> entry) {
+ return entry.getKey().contains("key");
+ }
+ };
/**
* Filters cache entry projections leaving only ones with keys not containing 'key'.
*/
protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilterInv =
new P1<Cache.Entry<String, Integer>>() {
- @Override public boolean apply(Cache.Entry<String, Integer> entry) {
- return !entry.getKey().contains("key");
- }
- };
+ @Override public boolean apply(Cache.Entry<String, Integer> entry) {
+ return !entry.getKey().contains("key");
+ }
+ };
/**
* Filters cache entry projections leaving only ones with values less than 50.
@@ -528,4 +537,117 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
}
}
+ /** */
+ protected enum ResourceType {
+ /** */
+ IGNITE_INSTANCE,
+
+ /** */
+ CACHE_NAME,
+
+ /** */
+ SPRING_APPLICATION_CONTEXT,
+
+ /** */
+ LOGGER,
+
+ /** */
+ SERVICE,
+
+ /** */
+ SPRING_BEAN,
+
+ }
+
+ /**
+ *
+ */
+ protected static class ResourceInfoSet {
+ /** */
+ int val;
+
+ /** */
+ public ResourceInfoSet() {
+ this(0);
+ }
+
+ /** */
+ public ResourceInfoSet(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @param val Value.
+ */
+ public static ResourceInfoSet valueOf(int val) {
+ return new ResourceInfoSet(val);
+ }
+
+ /** */
+ public int getValue() {
+ return val;
+ }
+
+ /**
+ * @param type Type.
+ * @param injected Injected.
+ */
+ public ResourceInfoSet set(ResourceType type, boolean injected) {
+ int mask = 1 << type.ordinal();
+
+ if (injected)
+ val |= mask;
+ else
+ val &= ~mask;
+
+ return this;
+ }
+
+ /**
+ * @see {@link #set(ResourceType, boolean)}
+ */
+ public ResourceInfoSet set(ResourceType type, Object toCheck) {
+ return set(type, toCheck != null);
+ }
+
+ /**
+ * @return collection of not injected resources
+ */
+ public Collection<ResourceType> notInjected(Collection<ResourceType> exp) {
+ ArrayList<ResourceType> res = null;
+
+ for (ResourceType type : exp) {
+ int mask = 1 << type.ordinal();
+
+ if ((this.val & mask) == 0) {
+ if (res == null)
+ res = new ArrayList<>();
+
+ res.add(type);
+ }
+ }
+
+ return res == null ? Collections.<ResourceType>emptyList() : res;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static abstract class ResourceInjectionEntryProcessorBase<K, V>
+ implements EntryProcessor<K, V, Integer>, Serializable {
+ /** */
+ protected transient ResourceInfoSet infoSet;
+
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<K, V> e, Object... args) {
+ return infoSet == null ? null : infoSet.getValue();
+ }
+
+ /** */
+ protected void checkSet() {
+ if (infoSet == null)
+ infoSet = new ResourceInfoSet();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index a3caba6..f36b060 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -74,9 +75,6 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String CACHE_NAME = "cache";
- /** Closure name. */
- private static final String CLO_NAME = Transformer.class.getName();
-
/** Key 1. */
private Integer key1;
@@ -98,7 +96,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
/** Caches. */
private IgniteCache<Integer, Integer>[] caches;
- /** Recorded events.*/
+ /** Recorded events. */
private ConcurrentHashSet<CacheEvent> evts;
/** Cache mode. */
@@ -477,13 +475,25 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
caches[0].invoke(key1, new Transformer());
- checkEventNodeIdsStrict(primaryIdsForKeys(key1));
+ checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new Transformer());
- checkEventNodeIdsStrict(primaryIdsForKeys(key1, key2));
+ checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1, key2));
+
+ assert evts.isEmpty();
+
+ caches[0].invoke(key1, new TransformerWithInjection());
+
+ checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1));
+
+ assert evts.isEmpty();
+
+ caches[0].invokeAll(keys, new TransformerWithInjection());
+
+ checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1, key2));
}
/**
@@ -492,7 +502,6 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
* @param cacheMode Cache mode.
* @param txConcurrency TX concurrency.
* @param txIsolation TX isolation.
- *
* @throws Exception If failed.
*/
private void checkTx(CacheMode cacheMode, TransactionConcurrency txConcurrency,
@@ -505,13 +514,29 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
System.out.println("AFTER: " + evts.size());
- checkEventNodeIdsStrict(idsForKeys(key1));
+ checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new Transformer());
- checkEventNodeIdsStrict(idsForKeys(key1, key2));
+ checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1, key2));
+
+ assert evts.isEmpty();
+
+ System.out.println("BEFORE: " + evts.size());
+
+ caches[0].invoke(key1, new TransformerWithInjection());
+
+ System.out.println("AFTER: " + evts.size());
+
+ checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1));
+
+ assert evts.isEmpty();
+
+ caches[0].invokeAll(keys, new TransformerWithInjection());
+
+ checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1, key2));
}
/**
@@ -572,9 +597,10 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
/**
* Ensure that events were recorded on the given nodes.
*
+ * @param cClsName Entry processor class name.
* @param ids Event IDs.
*/
- private void checkEventNodeIdsStrict(UUID... ids) {
+ private void checkEventNodeIdsStrict(String cClsName, UUID... ids) {
if (ids == null)
assertTrue(evts.isEmpty());
else {
@@ -585,7 +611,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
for (CacheEvent evt : evts) {
if (F.eq(id, evt.node().id())) {
- assertEquals(CLO_NAME, evt.closureClassName());
+ assertEquals(cClsName, evt.closureClassName());
foundEvt = evt;
@@ -625,4 +651,22 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
return null;
}
}
+
+ /**
+ * Transform closure.
+ */
+ private static class TransformerWithInjection implements EntryProcessor<Integer, Integer, Void>, Serializable {
+ /** */
+ @IgniteInstanceResource
+ private transient Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ assert ignite != null;
+
+ e.setValue(e.getValue() + 1);
+
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
index e78f329..bcf4ccd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
@@ -70,6 +70,8 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
stores = Collections.synchronizedList(new ArrayList<GridCacheTestStore>());
startGridsMultiThreaded(GRID_CNT);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/spring/src/test/java/org/apache/ignite/internal/processors/resource/GridTransformSpringInjectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/internal/processors/resource/GridTransformSpringInjectionSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/internal/processors/resource/GridTransformSpringInjectionSelfTest.java
new file mode 100644
index 0000000..cc61514
--- /dev/null
+++ b/modules/spring/src/test/java/org/apache/ignite/internal/processors/resource/GridTransformSpringInjectionSelfTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.resource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSpring;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class GridTransformSpringInjectionSelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeTestsStarted() throws Exception {
+ IgniteSpring.start(getConfiguration(getTestGridName(0)),
+ new ClassPathXmlApplicationContext("/org/apache/ignite/internal/processors/resource/spring-resource.xml"));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformResourceInjection() throws Exception {
+ Ignite grid = grid(0);
+
+ IgniteCache<String, Integer> cache = grid.createCache(cacheConfiguration(ATOMIC));
+
+ try {
+ doTransformResourceInjection(cache);
+ }
+ finally {
+ cache.destroy();
+ }
+
+ cache = grid.createCache(cacheConfiguration(TRANSACTIONAL));
+
+ try {
+ doTransformResourceInjection(cache);
+
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ IgniteTransactions txs = grid.transactions();
+
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ doTransformResourceInjection(cache);
+
+ tx.commit();
+ }
+ }
+ }
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<String, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+ CacheConfiguration<String, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName(getClass().getSimpleName());
+ ccfg.setAtomicityMode(atomicityMode);
+
+ return ccfg;
+ }
+
+ /**
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void doTransformResourceInjection(IgniteCache<String, Integer> cache) throws Exception {
+ final Collection<ResourceType> required = Arrays.asList(
+ ResourceType.SPRING_APPLICATION_CONTEXT,
+ ResourceType.SPRING_BEAN);
+
+ Integer flags = cache.invoke(UUID.randomUUID().toString(), new SpringResourceInjectionEntryProcessor());
+
+ assertTrue("Processor result is null", flags != null);
+
+ log.info("Injection flag: " + Integer.toBinaryString(flags));
+
+ Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(flags).notInjected(required);
+
+ if (!notInjected.isEmpty())
+ fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
+
+ Set<String> keys = new HashSet<>(Arrays.asList(UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString()));
+
+ Map<String, EntryProcessorResult<Integer>> results = cache.invokeAll(keys,
+ new SpringResourceInjectionEntryProcessor());
+
+ assertEquals(keys.size(), results.size());
+
+ for (EntryProcessorResult<Integer> res : results.values()) {
+ Collection<ResourceType> notInjected1 = ResourceInfoSet.valueOf(res.get()).notInjected(required);
+
+ if (!notInjected1.isEmpty())
+ fail("Can't inject resource(s): " + Arrays.toString(notInjected1.toArray()));
+ }
+ }
+
+ /**
+ *
+ */
+ static class SpringResourceInjectionEntryProcessor extends ResourceInjectionEntryProcessorBase<String, Integer> {
+ /** */
+ private transient ApplicationContext appCtx;
+
+ /** */
+ private transient GridSpringResourceInjectionSelfTest.DummyResourceBean dummyBean;
+
+ /**
+ * @param appCtx Context.
+ */
+ @SpringApplicationContextResource
+ public void setApplicationContext(ApplicationContext appCtx) {
+ assert appCtx != null;
+
+ checkSet();
+
+ infoSet.set(ResourceType.SPRING_APPLICATION_CONTEXT, true);
+
+ this.appCtx = appCtx;
+ }
+
+ /**
+ * @param dummyBean Resource bean.
+ */
+ @SpringResource(resourceName = "dummyResourceBean")
+ public void setDummyBean(GridSpringResourceInjectionSelfTest.DummyResourceBean dummyBean) {
+ assert dummyBean != null;
+
+ checkSet();
+
+ infoSet.set(ResourceType.SPRING_BEAN, true);
+
+ this.dummyBean = dummyBean;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index cd5645d..67b117d 100644
--- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -18,17 +18,18 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.cache.spring.GridSpringCacheManagerSelfTest;
+import org.apache.ignite.cache.spring.SpringCacheManagerContextInjectionTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactorySelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactorySelfTest;
import org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListenerSelfTest;
import org.apache.ignite.internal.GridFactorySelfTest;
import org.apache.ignite.internal.GridSpringBeanSerializationSelfTest;
import org.apache.ignite.internal.IgniteDynamicCacheConfigTest;
+import org.apache.ignite.internal.processors.resource.GridTransformSpringInjectionSelfTest;
import org.apache.ignite.p2p.GridP2PUserVersionChangeSelfTest;
-import org.apache.ignite.cache.spring.GridSpringCacheManagerSelfTest;
import org.apache.ignite.spring.IgniteExcludeInConfigurationTest;
import org.apache.ignite.spring.IgniteStartFromStreamConfigurationTest;
-import org.apache.ignite.cache.spring.SpringCacheManagerContextInjectionTest;
import org.apache.ignite.spring.injection.GridServiceInjectionSpringResourceTest;
import org.apache.ignite.transactions.spring.GridSpringTransactionManagerSelfTest;
import org.apache.ignite.transactions.spring.SpringTransactionManagerContextInjectionTest;
@@ -70,6 +71,8 @@ public class IgniteSpringTestSuite extends TestSuite {
suite.addTestSuite(GridServiceInjectionSpringResourceTest.class);
+ suite.addTestSuite(GridTransformSpringInjectionSelfTest.class);
+
suite.addTestSuite(SpringCacheManagerContextInjectionTest.class);
suite.addTestSuite(SpringTransactionManagerContextInjectionTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionBenchmark.java
new file mode 100644
index 0000000..ef9d17b
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionBenchmark.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+
+import javax.cache.processor.MutableEntry;
+import java.util.Map;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteInvokeWithInjectionBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key)));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("atomic");
+ }
+
+ /**
+ *
+ */
+ public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Object, Object> {
+ /** */
+ @IgniteInstanceResource
+ private transient Ignite ignite;
+
+ /** */
+ private Object val;
+
+ /**
+ * @param val Value.
+ */
+ public SetValueEntryProcessor(Object val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> entry, Object... args) {
+ assert ignite != null;
+
+ entry.setValue(val);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionTxBenchmark.java
new file mode 100644
index 0000000..2df93ee
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeWithInjectionTxBenchmark.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import org.apache.ignite.IgniteCache;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteInvokeWithInjectionTxBenchmark extends IgniteInvokeWithInjectionBenchmark {
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("tx");
+ }
+}
[8/9] ignite git commit: Fix for C++ tests.
Posted by ak...@apache.org.
Fix for C++ tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31dbc5d6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31dbc5d6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31dbc5d6
Branch: refs/heads/master
Commit: 31dbc5d65f8ea51010d2129e7c6e9a27acbf8528
Parents: 4a259da
Author: isapego <is...@gridgain.com>
Authored: Tue Aug 30 17:28:34 2016 +0300
Committer: isapego <is...@gridgain.com>
Committed: Tue Aug 30 17:28:34 2016 +0300
----------------------------------------------------------------------
.../odbc-test/config/queries-test-noodbc.xml | 48 +++++++--------
.../cpp/odbc-test/config/queries-test.xml | 50 +++++++--------
.../cpp/odbc-test/project/vs/odbc-test.vcxproj | 1 +
.../project/vs/odbc-test.vcxproj.filters | 3 +
.../cpp/odbc-test/src/queries_test.cpp | 64 +++-----------------
.../odbc-test/src/sql_test_suite_fixture.cpp | 14 ++---
6 files changed, 68 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/config/queries-test-noodbc.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/config/queries-test-noodbc.xml b/modules/platforms/cpp/odbc-test/config/queries-test-noodbc.xml
index 18447c2..db19669 100644
--- a/modules/platforms/cpp/odbc-test/config/queries-test-noodbc.xml
+++ b/modules/platforms/cpp/odbc-test/config/queries-test-noodbc.xml
@@ -28,6 +28,7 @@
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="localHost" value="127.0.0.1"/>
<property name="connectorConfiguration"><null/></property>
+ <property name="odbcConfiguration"><null/></property>
<property name="cacheConfiguration">
<list>
@@ -38,14 +39,14 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<!-- Configure type metadata to enable queries. -->
- <property name="queryEntities">
- <list>
- <bean class="org.apache.ignite.cache.QueryEntity">
- <property name="keyType" value="java.lang.Long"/>
- <property name="valueType" value="TestType"/>
+ <property name="queryEntities">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryEntity">
+ <property name="keyType" value="java.lang.Long"/>
+ <property name="valueType" value="TestType"/>
- <property name="fields">
- <map>
+ <property name="fields">
+ <map>
<entry key="i8Field" value="java.lang.Byte"/>
<entry key="i16Field" value="java.lang.Short"/>
<entry key="i32Field" value="java.lang.Integer"/>
@@ -57,22 +58,22 @@
<entry key="guidField" value="java.util.UUID"/>
<entry key="dateField" value="java.util.Date"/>
<entry key="timestampField" value="java.sql.Timestamp"/>
- </map>
- </property>
+ </map>
+ </property>
- <property name="indexes">
- <list>
- <bean class="org.apache.ignite.cache.QueryIndex">
- <constructor-arg value="i32Field"/>
- </bean>
- <bean class="org.apache.ignite.cache.QueryIndex">
- <constructor-arg value="i64Field"/>
- </bean>
- </list>
- </property>
- </bean>
- </list>
- </property>
+ <property name="indexes">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryIndex">
+ <constructor-arg value="i32Field"/>
+ </bean>
+ <bean class="org.apache.ignite.cache.QueryIndex">
+ <constructor-arg value="i64Field"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </list>
+ </property>
</bean>
</list>
</property>
@@ -86,8 +87,7 @@
instead os static IP based discovery.
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
- <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/config/queries-test.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/config/queries-test.xml b/modules/platforms/cpp/odbc-test/config/queries-test.xml
index 54cb9be..26e6341 100644
--- a/modules/platforms/cpp/odbc-test/config/queries-test.xml
+++ b/modules/platforms/cpp/odbc-test/config/queries-test.xml
@@ -32,8 +32,8 @@
<!-- Enabling ODBC. -->
<property name="odbcConfiguration">
<bean class="org.apache.ignite.configuration.OdbcConfiguration">
- <property name="endpointAddress" value="127.0.0.1:11110"/>
- </bean>
+ <property name="endpointAddress" value="127.0.0.1:11110"/>
+ </bean>
</property>
<property name="cacheConfiguration">
@@ -43,16 +43,16 @@
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
-
+
<!-- Configure type metadata to enable queries. -->
- <property name="queryEntities">
- <list>
- <bean class="org.apache.ignite.cache.QueryEntity">
- <property name="keyType" value="java.lang.Long"/>
- <property name="valueType" value="TestType"/>
+ <property name="queryEntities">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryEntity">
+ <property name="keyType" value="java.lang.Long"/>
+ <property name="valueType" value="TestType"/>
- <property name="fields">
- <map>
+ <property name="fields">
+ <map>
<entry key="i8Field" value="java.lang.Byte"/>
<entry key="i16Field" value="java.lang.Short"/>
<entry key="i32Field" value="java.lang.Integer"/>
@@ -64,22 +64,22 @@
<entry key="guidField" value="java.util.UUID"/>
<entry key="dateField" value="java.util.Date"/>
<entry key="timestampField" value="java.sql.Timestamp"/>
- </map>
- </property>
+ </map>
+ </property>
- <property name="indexes">
- <list>
- <bean class="org.apache.ignite.cache.QueryIndex">
- <constructor-arg value="i32Field"/>
- </bean>
- <bean class="org.apache.ignite.cache.QueryIndex">
- <constructor-arg value="i64Field"/>
- </bean>
- </list>
- </property>
- </bean>
- </list>
- </property>
+ <property name="indexes">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryIndex">
+ <constructor-arg value="i32Field"/>
+ </bean>
+ <bean class="org.apache.ignite.cache.QueryIndex">
+ <constructor-arg value="i64Field"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </list>
+ </property>
</bean>
</list>
</property>
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
index 0702047..cb5735f 100644
--- a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
+++ b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
@@ -203,6 +203,7 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
+ <None Include="..\..\config\queries-test-noodbc.xml" />
<None Include="..\..\config\queries-test.xml" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters
index 0a72640..270bdd6 100644
--- a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters
+++ b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters
@@ -125,5 +125,8 @@
<None Include="..\..\config\queries-test.xml">
<Filter>Configs</Filter>
</None>
+ <None Include="..\..\config\queries-test-noodbc.xml">
+ <Filter>Configs</Filter>
+ </None>
</ItemGroup>
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index ab59952..c907772 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -60,35 +60,6 @@ struct QueriesTestSuiteFixture
*/
void Connect(const std::string& connectStr)
{
- IgniteConfiguration cfg;
-
- cfg.jvmOpts.push_back("-Xdebug");
- cfg.jvmOpts.push_back("-Xnoagent");
- cfg.jvmOpts.push_back("-Djava.compiler=NONE");
- cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005");
- cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
-
-#ifdef IGNITE_TESTS_32
- cfg.jvmInitMem = 256;
- cfg.jvmMaxMem = 768;
-#else
- cfg.jvmInitMem = 1024;
- cfg.jvmMaxMem = 4096;
-#endif
-
- char* cfgPath = getenv("IGNITE_NATIVE_TEST_ODBC_CONFIG_PATH");
-
- cfg.springCfgPath = std::string(cfgPath).append("/").append("queries-test.xml");
-
- IgniteError err;
-
- grid = Ignition::Start(cfg, &err);
-
- if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
- BOOST_FAIL(err.GetText());
-
- testCache = grid.GetCache<int64_t, TestType>("cache");
-
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
@@ -141,7 +112,7 @@ struct QueriesTestSuiteFixture
SQLFreeHandle(SQL_HANDLE_ENV, env);
}
- static Ignite StartAdditionalNode(const char* name)
+ static Ignite StartNode(const char* name, const char* config)
{
IgniteConfiguration cfg;
@@ -163,43 +134,24 @@ struct QueriesTestSuiteFixture
BOOST_REQUIRE(cfgPath != 0);
- cfg.springCfgPath.assign(cfgPath).append("/queries-test.xml");
+ cfg.springCfgPath.assign(cfgPath).append("/").append(config);
IgniteError err;
return Ignition::Start(cfg, name);
}
+ static Ignite StartAdditionalNode(const char* name)
+ {
+ return StartNode(name, "queries-test-noodbc.xml");
+ }
+
/**
* Constructor.
*/
QueriesTestSuiteFixture() : testCache(0), env(NULL), dbc(NULL), stmt(NULL)
{
- IgniteConfiguration cfg;
-
- cfg.jvmOpts.push_back("-Xdebug");
- cfg.jvmOpts.push_back("-Xnoagent");
- cfg.jvmOpts.push_back("-Djava.compiler=NONE");
- cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005");
- cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
-
-#ifdef IGNITE_TESTS_32
- cfg.jvmInitMem = 256;
- cfg.jvmMaxMem = 768;
-#else
- cfg.jvmInitMem = 1024;
- cfg.jvmMaxMem = 4096;
-#endif
-
- char* cfgPath = getenv("IGNITE_NATIVE_TEST_ODBC_CONFIG_PATH");
-
- BOOST_REQUIRE(cfgPath != 0);
-
- cfg.springCfgPath.assign(cfgPath).append("/queries-test.xml");
-
- IgniteError err;
-
- grid = Ignition::Start(cfg, "NodeMain");
+ grid = StartNode("NodeMain", "queries-test.xml");
testCache = grid.GetCache<int64_t, TestType>("cache");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/31dbc5d6/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp b/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp
index 16e5ea0..69b4bfa 100644
--- a/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp
+++ b/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp
@@ -45,7 +45,7 @@ namespace ignite
char* cfgPath = getenv("IGNITE_NATIVE_TEST_ODBC_CONFIG_PATH");
- BOOST_REQUIRE(cfgPath != 0) ;
+ BOOST_REQUIRE(cfgPath != 0);
cfg.springCfgPath.assign(cfgPath).append("/queries-test.xml");
@@ -54,14 +54,14 @@ namespace ignite
grid = Ignition::Start(cfg, &err);
if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
- BOOST_FAIL(err.GetText()) ;
+ BOOST_FAIL(err.GetText()) ;
testCache = grid.GetCache<int64_t, TestType>("cache");
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
- BOOST_REQUIRE(env != NULL) ;
+ BOOST_REQUIRE(env != NULL);
// We want ODBC 3 support
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
@@ -69,10 +69,10 @@ namespace ignite
// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
- BOOST_REQUIRE(dbc != NULL) ;
+ BOOST_REQUIRE(dbc != NULL);
// Connect string
- SQLCHAR connectStr[] = "DRIVER={Apache Ignite};SERVER=localhost;PORT=10800;CACHE=cache";
+ SQLCHAR connectStr[] = "DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache";
SQLCHAR outstr[ODBC_BUFFER_SIZE];
SQLSMALLINT outstrlen;
@@ -85,13 +85,13 @@ namespace ignite
{
Ignition::Stop(grid.GetName(), true);
- BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_DBC, dbc)) ;
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_DBC, dbc));
}
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
- BOOST_REQUIRE(stmt != NULL) ;
+ BOOST_REQUIRE(stmt != NULL);
}
SqlTestSuiteFixture::~SqlTestSuiteFixture()
[6/9] ignite git commit: Revert wrong merge.
Posted by ak...@apache.org.
Revert wrong merge.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1ef150eb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1ef150eb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1ef150eb
Branch: refs/heads/master
Commit: 1ef150eba52eb63c2bfc3fafa0d036cf26be1c5b
Parents: fbbcaf4
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Aug 30 18:18:20 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Aug 30 18:18:20 2016 +0700
----------------------------------------------------------------------
.../ignite/visor/commands/cache/VisorCacheStopCommand.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef150eb/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala
index 1b55505..22fb89d 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala
@@ -19,7 +19,6 @@ package org.apache.ignite.visor.commands.cache
import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterNode}
import org.apache.ignite.visor.visor._
-
import org.apache.ignite.internal.visor.cache.VisorCacheStopTask
import org.apache.ignite.internal.visor.util.VisorTaskUtils._
@@ -102,7 +101,9 @@ class VisorCacheStopCommand {
return
}
- ask(s"Are you sure you want to stop cache: ${escapeName(cacheName)}? (y/n) [n]: ", "n") match {
+ val dflt = if (batchMode) "y" else "n"
+
+ ask(s"Are you sure you want to stop cache: ${escapeName(cacheName)}? (y/n) [$dflt]: ", dflt) match {
case "y" | "Y" =>
try {
executeRandom(grp, classOf[VisorCacheStopTask], cacheName)
[5/9] ignite git commit: IGNITE-3798: ODBC: Added literals support.
This closes #1005.
Posted by ak...@apache.org.
IGNITE-3798: ODBC: Added literals support. This closes #1005.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fbbcaf43
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fbbcaf43
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fbbcaf43
Branch: refs/heads/master
Commit: fbbcaf4322548f61d2f63bf5d4e8f6d5284e73d3
Parents: 3244a5c
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Aug 30 13:22:29 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Aug 30 13:22:29 2016 +0300
----------------------------------------------------------------------
.../processors/odbc/escape/OdbcEscapeUtils.java | 87 +++++++++++---------
.../odbc/OdbcEscapeSequenceSelfTest.java | 55 +++++++++++++
2 files changed, 105 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fbbcaf43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
index 27120d4..48d4296 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
@@ -69,60 +69,70 @@ public class OdbcEscapeUtils {
int plainPos = startPos;
int openPos = -1;
+ boolean insideLiteral = false;
+
LinkedList<OdbcEscapeParseResult> nested = null;
while (curPos < text.length()) {
char curChar = text.charAt(curPos);
- if (curChar == '{') {
- if (openPos == -1) {
- // Top-level opening brace. Append previous portion and remember current position.
- res.append(text, plainPos, curPos);
+ if (curChar == '\'') {
+ if (!insideLiteral)
+ insideLiteral = true;
+ else if (text.charAt(curPos - 1) != '\\')
+ insideLiteral = false;
+ }
+ else if (!insideLiteral) {
+ if (curChar == '{') {
+ if (openPos == -1) {
+ // Top-level opening brace. Append previous portion and remember current position.
+ res.append(text, plainPos, curPos);
- openPos = curPos;
- }
- else {
- // Nested opening brace -> perform recursion.
- OdbcEscapeParseResult nestedRes = parse0(text, curPos, true);
+ openPos = curPos;
+ }
+ else {
+ // Nested opening brace -> perform recursion.
+ OdbcEscapeParseResult nestedRes = parse0(text, curPos, true);
- if (nested == null)
- nested = new LinkedList<>();
+ if (nested == null)
+ nested = new LinkedList<>();
- nested.add(nestedRes);
+ nested.add(nestedRes);
- curPos += nestedRes.originalLength() - 1;
+ curPos += nestedRes.originalLength() - 1;
- plainPos = curPos + 1;
+ plainPos = curPos + 1;
+ }
}
- }
- else if (curChar == '}') {
- if (openPos == -1)
- // Close without open -> exception.
- throw new IgniteException("Malformed escape sequence " +
- "(closing curly brace without opening curly brace): " + text);
- else {
- String parseRes;
-
- if (nested == null)
- // Found sequence without nesting, process it.
- parseRes = parseEscapeSequence(text, openPos, curPos + 1 - openPos);
+ else if (curChar == '}') {
+ if (openPos == -1)
+ // Close without open -> exception.
+ throw new IgniteException("Malformed escape sequence " +
+ "(closing curly brace without opening curly brace): " + text);
else {
- // Special case to process nesting.
- String res0 = appendNested(text, openPos, curPos + 1, nested);
+ String parseRes;
- nested = null;
+ if (nested == null)
+ // Found sequence without nesting, process it.
+ parseRes = parseEscapeSequence(text, openPos, curPos + 1 - openPos);
+ else {
+ // Special case to process nesting.
+ String res0 = appendNested(text, openPos, curPos + 1, nested);
- parseRes = parseEscapeSequence(res0, 0, res0.length());
- }
+ nested = null;
- if (earlyExit)
- return new OdbcEscapeParseResult(startPos, curPos + 1 - startPos, parseRes);
- else
- res.append(parseRes);
+ parseRes = parseEscapeSequence(res0, 0, res0.length());
+ }
- openPos = -1;
+ if (earlyExit)
+ return new OdbcEscapeParseResult(startPos, curPos + 1 - startPos, parseRes);
+ else
+ res.append(parseRes);
- plainPos = curPos + 1;
+ openPos = -1;
+
+ plainPos = curPos + 1;
+ }
}
}
@@ -132,6 +142,9 @@ public class OdbcEscapeUtils {
if (openPos != -1)
throw new IgniteException("Malformed escape sequence (closing curly brace missing): " + text);
+ if (insideLiteral)
+ throw new IgniteException("Malformed literal expression (closing quote missing): " + text);
+
if (curPos > plainPos)
res.append(text, plainPos, curPos);
http://git-wip-us.apache.org/repos/asf/ignite/blob/fbbcaf43/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
index 4887a67..3fec7d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
@@ -486,6 +486,61 @@ public class OdbcEscapeSequenceSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test non-escape sequences.
+ */
+ public void testNonEscapeSequence() throws Exception {
+ check("'{fn test()}'", "'{fn test()}'");
+
+ check("select '{fn test()}'", "select '{fn test()}'");
+
+ check(
+ "select '{fn test()}' from table;",
+ "select '{fn test()}' from table;"
+ );
+
+ check(
+ "select test('arg') from table;",
+ "select {fn test('arg')} from table;"
+ );
+
+ check(
+ "select test('{fn func()}') from table;",
+ "select {fn test('{fn func()}')} from table;"
+ );
+
+ check(
+ "'{\\'some literal\\'}'",
+ "'{\\'some literal\\'}'"
+ );
+
+ check(
+ "select '{\\'some literal\\'}'",
+ "select '{\\'some literal\\'}'"
+ );
+
+ check(
+ "select '{\\'some literal\\'}' from table;",
+ "select '{\\'some literal\\'}' from table;"
+ );
+
+ check(
+ "select '{' + func() + '}' from table;",
+ "select '{' + {fn func()} + '}' from table;"
+ );
+
+ check(
+ "select '{\\'{fn test()}\\'}' from table;",
+ "select '{\\'{fn test()}\\'}' from table;"
+ );
+
+ checkFail("'{fn test()}");
+
+ checkFail("{fn func('arg)}");
+
+ checkFail("{fn func(arg')}");
+ }
+
+ /**
* Check parsing logic.
*
* @param exp Expected result.
[9/9] ignite git commit: Merge branches 'ignite-1.7.2' and 'master'.
Posted by ak...@apache.org.
Merge branches 'ignite-1.7.2' and 'master'.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f697876
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f697876
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f697876
Branch: refs/heads/master
Commit: 8f697876a3a3ab491434289864a17ea3c3f82ce9
Parents: d98cd30 31dbc5d
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Aug 31 10:29:53 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Aug 31 10:29:53 2016 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 15 +
.../processors/cache/CacheLazyEntry.java | 2 +
.../EntryProcessorResourceInjectorProxy.java | 105 ++++
.../processors/cache/GridCacheMapEntry.java | 13 +-
.../processors/cache/IgniteCacheProxy.java | 112 +++-
.../GridNearAtomicSingleUpdateFuture.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 18 +-
.../processors/cache/query/CacheQuery.java | 11 +-
.../query/GridCacheDistributedQueryManager.java | 22 +-
.../cache/query/GridCacheLocalQueryManager.java | 3 +-
.../cache/query/GridCacheQueryAdapter.java | 69 ++-
.../cache/query/GridCacheQueryBean.java | 8 +-
.../cache/query/GridCacheQueryInfo.java | 8 +-
.../cache/query/GridCacheQueryManager.java | 125 ++--
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../transactions/IgniteTxLocalAdapter.java | 5 +-
.../processors/odbc/escape/OdbcEscapeUtils.java | 115 ++--
.../processors/resource/GridResourceIoc.java | 438 ++++++++++----
.../resource/GridResourceProcessor.java | 396 ++++++-------
.../cache/GridCacheAbstractFullApiSelfTest.java | 393 +++++++++++--
.../cache/GridCacheAbstractSelfTest.java | 140 ++++-
.../GridCacheTransformEventSelfTest.java | 66 ++-
...ePartitionedBasicStoreMultiNodeSelfTest.java | 2 +
.../GridCacheQueryTransformerSelfTest.java | 570 +++++++++++++++++++
.../odbc/OdbcEscapeSequenceSelfTest.java | 164 +++++-
.../multijvm/IgniteCacheProcessProxy.java | 6 +
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../odbc-test/config/queries-test-noodbc.xml | 48 +-
.../cpp/odbc-test/config/queries-test.xml | 50 +-
.../cpp/odbc-test/project/vs/odbc-test.vcxproj | 1 +
.../project/vs/odbc-test.vcxproj.filters | 3 +
.../cpp/odbc-test/src/queries_test.cpp | 64 +--
.../odbc-test/src/sql_test_suite_fixture.cpp | 14 +-
.../GridTransformSpringInjectionSelfTest.java | 186 ++++++
.../testsuites/IgniteSpringTestSuite.java | 7 +-
.../commands/cache/VisorCacheStopCommand.scala | 5 +-
.../IgniteInvokeWithInjectionBenchmark.java | 74 +++
.../IgniteInvokeWithInjectionTxBenchmark.java | 30 +
39 files changed, 2632 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
[4/9] ignite git commit: IGNITE-3742: ODBC: Added support for OUTER
JOIN escape sequence. This closes #1000.
Posted by ak...@apache.org.
IGNITE-3742: ODBC: Added support for OUTER JOIN escape sequence. This closes #1000.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3244a5c9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3244a5c9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3244a5c9
Branch: refs/heads/master
Commit: 3244a5c9dabf6d4fcaa32a661cc0adc6f8ea30de
Parents: f9ff97c
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Aug 30 11:49:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Aug 30 11:49:11 2016 +0300
----------------------------------------------------------------------
.../processors/odbc/escape/OdbcEscapeUtils.java | 32 +++---
.../odbc/OdbcEscapeSequenceSelfTest.java | 109 ++++++++++++++++++-
2 files changed, 122 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3244a5c9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
index a4b89c3..27120d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/escape/OdbcEscapeUtils.java
@@ -105,14 +105,14 @@ public class OdbcEscapeUtils {
if (nested == null)
// Found sequence without nesting, process it.
- parseRes = parseExpression(text, openPos, curPos + 1 - openPos);
+ parseRes = parseEscapeSequence(text, openPos, curPos + 1 - openPos);
else {
// Special case to process nesting.
String res0 = appendNested(text, openPos, curPos + 1, nested);
nested = null;
- parseRes = parseExpression(res0, 0, res0.length());
+ parseRes = parseEscapeSequence(res0, 0, res0.length());
}
if (earlyExit)
@@ -139,14 +139,14 @@ public class OdbcEscapeUtils {
}
/**
- * Parse concrete expression.
+ * Parse escape sequence: {escape_sequence}.
*
* @param text Text.
* @param startPos Start position within text.
* @param len Length.
* @return Result.
*/
- private static String parseExpression(String text, int startPos, int len) {
+ private static String parseEscapeSequence(String text, int startPos, int len) {
assert validSubstring(text, startPos, len);
char firstChar = text.charAt(startPos);
@@ -228,7 +228,7 @@ public class OdbcEscapeUtils {
}
/**
- * Parse standard token.
+ * Parse standard expression: {TOKEN expression}
*
* @param text Text.
* @param startPos Start position.
@@ -245,10 +245,13 @@ public class OdbcEscapeUtils {
switch (token.type()) {
case SCALAR_FUNCTION:
- return parseScalarExpression(text, startPos0, len0);
+ return parseExpression(text, startPos0, len0);
+
+ case GUID: {
+ String res = parseExpression(text, startPos0, len0, token.type(), GUID_PATTERN);
- case GUID:
- return parseExpression(text, startPos0, len0, token.type(), GUID_PATTERN);
+ return "CAST(" + res + " AS UUID)";
+ }
case DATE:
return parseExpression(text, startPos0, len0, token.type(), DATE_PATTERN);
@@ -259,6 +262,9 @@ public class OdbcEscapeUtils {
case TIMESTAMP:
return parseExpression(text, startPos0, len0, token.type(), TIMESTAMP_PATTERN);
+ case OUTER_JOIN:
+ return parseExpression(text, startPos0, len0);
+
default:
throw new IgniteException("Unsupported escape sequence token [text=" +
substring(text, startPos, len) + ", token=" + token.type().body() + ']');
@@ -266,19 +272,19 @@ public class OdbcEscapeUtils {
}
/**
- * Parse scalar function expression.
+ * Parse simple expression.
*
* @param text Text.
* @param startPos Start position.
* @param len Length.
* @return Parsed expression.
*/
- private static String parseScalarExpression(String text, int startPos, int len) {
+ private static String parseExpression(String text, int startPos, int len) {
return substring(text, startPos, len).trim();
}
/**
- * Parse concrete expression.
+ * Parse expression and validate against ODBC specification with regex pattern.
*
* @param text Text.
* @param startPos Start position.
@@ -286,12 +292,12 @@ public class OdbcEscapeUtils {
* @return Parsed expression.
*/
private static String parseExpression(String text, int startPos, int len, OdbcEscapeType type, Pattern pattern) {
- String val = substring(text, startPos, len).trim();
+ String val = parseExpression(text, startPos, len);
if (!pattern.matcher(val).matches())
throw new IgniteException("Invalid " + type + " escape sequence: " + substring(text, startPos, len));
- return "CAST(" + val + " AS UUID)";
+ return val;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3244a5c9/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
index 1aa90fd..4887a67 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcEscapeSequenceSelfTest.java
@@ -167,17 +167,17 @@ public class OdbcEscapeSequenceSelfTest extends GridCommonAbstractTest {
*/
public void testGuidEscapeSequence() {
check(
- "'12345678-9abc-def0-1234-123456789abc'",
+ "CAST('12345678-9abc-def0-1234-123456789abc' AS UUID)",
"{guid '12345678-9abc-def0-1234-123456789abc'}"
);
check(
- "select '12345678-9abc-def0-1234-123456789abc' from SomeTable;",
+ "select CAST('12345678-9abc-def0-1234-123456789abc' AS UUID) from SomeTable;",
"select {guid '12345678-9abc-def0-1234-123456789abc'} from SomeTable;"
);
check(
- "select '12345678-9abc-def0-1234-123456789abc'",
+ "select CAST('12345678-9abc-def0-1234-123456789abc' AS UUID)",
"select {guid '12345678-9abc-def0-1234-123456789abc'}"
);
}
@@ -212,17 +212,17 @@ public class OdbcEscapeSequenceSelfTest extends GridCommonAbstractTest {
*/
public void testGuidEscapeSequenceWithWhitespaces() throws Exception {
check(
- "'12345678-9abc-def0-1234-123456789abc'",
+ "CAST('12345678-9abc-def0-1234-123456789abc' AS UUID)",
"{ guid '12345678-9abc-def0-1234-123456789abc'}"
);
check(
- "'12345678-9abc-def0-1234-123456789abc'",
+ "CAST('12345678-9abc-def0-1234-123456789abc' AS UUID)",
"{ guid '12345678-9abc-def0-1234-123456789abc'}"
);
check(
- "'12345678-9abc-def0-1234-123456789abc'",
+ "CAST('12345678-9abc-def0-1234-123456789abc' AS UUID)",
"{ \n guid\n'12345678-9abc-def0-1234-123456789abc'}"
);
}
@@ -388,6 +388,103 @@ public class OdbcEscapeSequenceSelfTest extends GridCommonAbstractTest {
checkFail("select {}ts '2016-08-26 13:15:08'} from table;");
}
+
+ /**
+ * Test escape sequence series.
+ */
+ public void testOuterJoinFunction() throws Exception {
+ check(
+ "t OUTER JOIN t2 ON t.id=t2.id",
+ "{oj t OUTER JOIN t2 ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from t OUTER JOIN t2 ON t.id=t2.id",
+ "select * from {oj t OUTER JOIN t2 ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from t OUTER JOIN t2 ON t.id=t2.id ORDER BY t2.id",
+ "select * from {oj t OUTER JOIN t2 ON t.id=t2.id} ORDER BY t2.id"
+ );
+ }
+
+ /**
+ * Test simple nested escape sequences. Depth = 2.
+ */
+ public void testNestedOuterJoin() throws Exception {
+ check(
+ "t OUTER JOIN (t2 OUTER JOIN t3 ON t2.id=t3.id) ON t.id=t2.id",
+ "{oj t OUTER JOIN ({oj t2 OUTER JOIN t3 ON t2.id=t3.id}) ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from t OUTER JOIN (t2 OUTER JOIN t3 ON t2.id=t3.id) ON t.id=t2.id",
+ "select * from {oj t OUTER JOIN ({oj t2 OUTER JOIN t3 ON t2.id=t3.id}) ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from t OUTER JOIN (t2 OUTER JOIN t3 ON t2.id=t3.id) ON t.id=t2.id ORDER BY t2.id",
+ "select * from {oj t OUTER JOIN ({oj t2 OUTER JOIN t3 ON t2.id=t3.id}) ON t.id=t2.id} ORDER BY t2.id"
+ );
+ }
+
+ /**
+ * Test nested escape sequences. Depth > 2.
+ */
+ public void testDeepNestedOuterJoin() {
+ check(
+ "t OUTER JOIN (t2 OUTER JOIN (t3 OUTER JOIN t4 ON t3.id=t4.id) ON t2.id=t3.id) ON t.id=t2.id",
+ "{oj t OUTER JOIN ({oj t2 OUTER JOIN ({oj t3 OUTER JOIN t4 ON t3.id=t4.id}) ON t2.id=t3.id}) ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from " +
+ "t OUTER JOIN (t2 OUTER JOIN (t3 OUTER JOIN t4 ON t3.id=t4.id) ON t2.id=t3.id) ON t.id=t2.id",
+ "select * from " +
+ "{oj t OUTER JOIN ({oj t2 OUTER JOIN ({oj t3 OUTER JOIN t4 ON t3.id=t4.id}) ON t2.id=t3.id})" +
+ " ON t.id=t2.id}"
+ );
+
+ check(
+ "select * from t OUTER JOIN (t2 OUTER JOIN (t3 OUTER JOIN t4 ON t3.id=t4.id) " +
+ "ON t2.id=t3.id) ON t.id=t2.id ORDER BY t4.id",
+ "select * from {oj t OUTER JOIN ({oj t2 OUTER JOIN ({oj t3 OUTER JOIN t4 ON t3.id=t4.id}) " +
+ "ON t2.id=t3.id}) ON t.id=t2.id} ORDER BY t4.id"
+ );
+ }
+
+ /**
+ * Test invalid escape sequence.
+ */
+ public void testFailedOnInvalidOuterJoinSequence() {
+ checkFail("{ojt OUTER JOIN t2 ON t.id=t2.id}");
+
+ checkFail("select {oj t OUTER JOIN ({oj t2 OUTER JOIN t3 ON t2.id=t3.id) ON t.id=t2.id} from SomeTable;");
+
+ checkFail("select oj t OUTER JOIN t2 ON t.id=t2.id} from SomeTable;");
+ }
+
+ /**
+ * Test escape sequences with additional whitespace characters
+ */
+ public void testOuterJoinSequenceWithWhitespaces() throws Exception {
+ check(
+ "t OUTER JOIN t2 ON t.id=t2.id",
+ "{ oj t OUTER JOIN t2 ON t.id=t2.id}"
+ );
+
+ check(
+ "t OUTER JOIN t2 ON t.id=t2.id",
+ "{ oj t OUTER JOIN t2 ON t.id=t2.id}"
+ );
+
+ check(
+ "t OUTER JOIN t2 ON t.id=t2.id",
+ " \n { oj\nt OUTER JOIN t2 ON t.id=t2.id}"
+ );
+ }
+
/**
* Check parsing logic.
*