You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/20 11:33:44 UTC
[1/5] ignite git commit: IGNITE-4511: SQL: set default index type to
"SORTED". This closes #1513.
Repository: ignite
Updated Branches:
refs/heads/ignite-4705 2029d5a1e -> 7b979880c
IGNITE-4511: SQL: set default index type to "SORTED". This closes #1513.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c9b0246
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c9b0246
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c9b0246
Branch: refs/heads/ignite-4705
Commit: 1c9b02462f38cda8fabfbc84aa6c7aae65c8d33d
Parents: f2328a4
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Feb 17 14:33:40 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Feb 17 14:33:40 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/QueryEntity.java | 4 +-
.../org/apache/ignite/cache/QueryIndex.java | 25 ++++-
.../processors/query/GridQueryProcessor.java | 15 ++-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 97 ++++++++++++++++++++
4 files changed, 130 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9b0246/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
index 5a3671b..48cdae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
@@ -190,6 +190,9 @@ public class QueryEntity implements Serializable {
if (idx.getName() == null)
idx.setName(defaultIndexName(idx));
+ if (idx.getIndexType() == null)
+ throw new IllegalArgumentException("Index type is not set " + idx.getName());
+
if (!this.idxs.containsKey(idx.getName()))
this.idxs.put(idx.getName(), idx);
else
@@ -198,7 +201,6 @@ public class QueryEntity implements Serializable {
}
}
-
/**
* Gets table name for this query entity.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9b0246/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
index af11999..53f9e4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
@@ -31,6 +31,9 @@ public class QueryIndex implements Serializable {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final QueryIndexType DFLT_IDX_TYP = QueryIndexType.SORTED;
+
/** Index name. */
private String name;
@@ -38,7 +41,7 @@ public class QueryIndex implements Serializable {
private LinkedHashMap<String, Boolean> fields;
/** */
- private QueryIndexType type;
+ private QueryIndexType type = DFLT_IDX_TYP;
/**
* Creates an empty index. Should be populated via setters.
@@ -163,9 +166,12 @@ public class QueryIndex implements Serializable {
* Sets index name.
*
* @param name Index name.
+ * @return {@code this} for chaining.
*/
- public void setName(String name) {
+ public QueryIndex setName(String name) {
this.name = name;
+
+ return this;
}
/**
@@ -181,9 +187,12 @@ public class QueryIndex implements Serializable {
* Sets fields included in the index.
*
* @param fields Collection of index fields.
+ * @return {@code this} for chaining.
*/
- public void setFields(LinkedHashMap<String, Boolean> fields) {
+ public QueryIndex setFields(LinkedHashMap<String, Boolean> fields) {
this.fields = fields;
+
+ return this;
}
/**
@@ -199,12 +208,15 @@ public class QueryIndex implements Serializable {
*
* @param fields Collection of fields.
* @param asc Ascending flag.
+ * @return {@code this} for chaining.
*/
- public void setFieldNames(Collection<String> fields, boolean asc) {
+ public QueryIndex setFieldNames(Collection<String> fields, boolean asc) {
this.fields = new LinkedHashMap<>();
for (String field : fields)
this.fields.put(field, asc);
+
+ return this;
}
/**
@@ -220,8 +232,11 @@ public class QueryIndex implements Serializable {
* Sets index type.
*
* @param type Index type.
+ * @return {@code this} for chaining.
*/
- public void setIndexType(QueryIndexType type) {
+ public QueryIndex setIndexType(QueryIndexType type) {
this.type = type;
+
+ return this;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9b0246/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0ff6d8b..24ccb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1566,8 +1566,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (idxName == null)
idxName = QueryEntity.defaultIndexName(idx);
- if (idx.getIndexType() == QueryIndexType.SORTED || idx.getIndexType() == QueryIndexType.GEOSPATIAL) {
- d.addIndex(idxName, idx.getIndexType() == QueryIndexType.SORTED ? SORTED : GEO_SPATIAL);
+ QueryIndexType idxTyp = idx.getIndexType();
+
+ if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) {
+ d.addIndex(idxName, idxTyp == QueryIndexType.SORTED ? SORTED : GEO_SPATIAL);
int i = 0;
@@ -1583,9 +1585,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
d.addFieldToIndex(idxName, field, i++, !asc);
}
}
- else {
- assert idx.getIndexType() == QueryIndexType.FULLTEXT;
-
+ else if (idxTyp == QueryIndexType.FULLTEXT){
for (String field : idx.getFields().keySet()) {
String alias = aliases.get(field);
@@ -1595,6 +1595,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
d.addFieldToTextIndex(field);
}
}
+ else if (idxTyp != null)
+ throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() +
+ ", typ=" + idxTyp + ']');
+ else
+ throw new IllegalArgumentException("Index type is not set: " + idx.getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9b0246/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 81c28a3..2b240e9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -207,6 +207,21 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
entityList.add(qryEntity);
+ qryEntity = new QueryEntity();
+
+ qryEntity.setKeyType(Integer.class.getName());
+ qryEntity.setValueType(ObjectValue2.class.getName());
+
+ qryEntity.addQueryField("strVal", String.class.getName(), null);
+
+ final QueryIndex strIdx = new QueryIndex(); // Default index type
+
+ strIdx.setFieldNames(Collections.singletonList("strVal"), true);
+
+ qryEntity.setIndexes(Arrays.asList(strIdx));
+
+ entityList.add(qryEntity);
+
cc.setQueryEntities(entityList);
if (cacheMode() != CacheMode.LOCAL)
@@ -759,6 +774,40 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
*
* @throws Exception In case of error.
*/
+ public void testObjectWithString() throws Exception {
+ IgniteCache<Integer, ObjectValue2> cache = ignite().cache(null);
+
+ cache.put(1, new ObjectValue2("value 1"));
+ cache.put(2, new ObjectValue2("value 2"));
+ cache.put(3, new ObjectValue2("value 3"));
+
+ QueryCursor<Cache.Entry<Integer, ObjectValue2>> qry
+ = cache.query(new SqlQuery<Integer, ObjectValue2>(ObjectValue2.class, "strVal like ?").setArgs("value%"));
+
+ int expCnt = 3;
+
+ List<Cache.Entry<Integer, ObjectValue2>> results = qry.getAll();
+
+ assertEquals(expCnt, results.size());
+
+ qry = cache.query(new SqlQuery<Integer, ObjectValue2>(ObjectValue2.class, "strVal > ?").setArgs("value 1"));
+
+ results = qry.getAll();
+
+ assertEquals(expCnt - 1, results.size());
+
+ qry = cache.query(new TextQuery<Integer, ObjectValue2>(ObjectValue2.class, "value"));
+
+ results = qry.getAll();
+
+ assertEquals(0, results.size()); //Should fail for FULL_TEXT index, but SORTED
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception In case of error.
+ */
public void testEnumObjectQuery() throws Exception {
final IgniteCache<Long, EnumObject> cache = ignite().cache(null);
@@ -2061,6 +2110,54 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
}
/**
+ * Another test value object.
+ */
+ private static class ObjectValue2 {
+ /** Value. */
+ private String strVal;
+
+ /**
+ * @param strVal String value.
+ */
+ ObjectValue2(String strVal) {
+ this.strVal = strVal;
+ }
+
+ /**
+ * Gets value.
+ *
+ * @return Value.
+ */
+ public String value() {
+ return strVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ObjectValue2 other = (ObjectValue2)o;
+
+ return strVal == null ? other.strVal == null : strVal.equals(other.strVal);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return strVal != null ? strVal.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ObjectValue2.class, this);
+ }
+ }
+
+ /**
* Empty test object.
*/
@SuppressWarnings("UnusedDeclaration")
[3/5] ignite git commit: IGNITE-4611 Write IgniteUuid with
BinaryMarshaller
Posted by sb...@apache.org.
IGNITE-4611 Write IgniteUuid with BinaryMarshaller
This closes #1551
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53802d8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53802d8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53802d8b
Branch: refs/heads/ignite-4705
Commit: 53802d8b46f8ee09d51ba84267f65dc6f7e73a2f
Parents: ab1b685
Author: Vyacheslav Daradur <da...@gmail.com>
Authored: Mon Feb 20 11:22:49 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Feb 20 11:22:49 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/binary/BinaryContext.java | 3 +++
.../java/org/apache/ignite/lang/IgniteUuid.java | 25 +++++++++++++++++++-
.../binary/BinaryMarshallerSelfTest.java | 10 ++++++++
3 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/53802d8b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index ec2fc7b..4773e3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
@@ -194,6 +195,8 @@ public class BinaryContext {
sysClss.add(GridClosureProcessor.C4V2.class.getName());
sysClss.add(GridClosureProcessor.C4MLAV2.class.getName());
+ sysClss.add(IgniteUuid.class.getName());
+
if (BinaryUtils.wrapTrees()) {
sysClss.add(TreeMap.class.getName());
sysClss.add(TreeSet.class.getName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/53802d8b/modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java
index 7de389e..4b93261 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java
@@ -24,6 +24,12 @@ import java.io.ObjectOutput;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -34,7 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
* 10x time faster for ID creation. It uses extra memory for 8-byte counter additionally to
* internal UUID.
*/
-public final class IgniteUuid implements Comparable<IgniteUuid>, Iterable<IgniteUuid>, Cloneable, Externalizable {
+public final class IgniteUuid implements Comparable<IgniteUuid>, Iterable<IgniteUuid>, Cloneable, Externalizable, Binarylizable {
/** */
private static final long serialVersionUID = 0L;
@@ -203,6 +209,23 @@ public final class IgniteUuid implements Comparable<IgniteUuid>, Iterable<Ignite
}
/** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter out = writer.rawWriter();
+
+ out.writeLong(locId);
+ out.writeLong(gid.getMostSignificantBits());
+ out.writeLong(gid.getLeastSignificantBits());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader in = reader.rawReader();
+
+ locId = in.readLong();
+ gid = new UUID(in.readLong(), in.readLong());
+ }
+
+ /** {@inheritDoc} */
@Override public int hashCode() {
return 31 * gid.hashCode() + (int)(locId ^ (locId >>> 32));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53802d8b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 5bfc95c..1cac1a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -294,6 +295,15 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testIgniteUuid() throws Exception {
+ IgniteUuid uuid = IgniteUuid.randomUuid();
+
+ assertEquals(uuid, marshalUnmarshal(uuid));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testDate() throws Exception {
Date date = new Date();
[4/5] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-4705
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/743f8b40
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/743f8b40
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/743f8b40
Branch: refs/heads/ignite-4705
Commit: 743f8b40a7a4721e287e1645b5d8ce44c3aae707
Parents: 2029d5a 53802d8
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 20 11:54:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 20 11:54:51 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/QueryEntity.java | 4 +-
.../org/apache/ignite/cache/QueryIndex.java | 25 +++-
.../ignite/internal/binary/BinaryContext.java | 3 +
.../utils/PlatformConfigurationUtils.java | 64 ++++++++++-
.../processors/query/GridQueryProcessor.java | 15 ++-
.../java/org/apache/ignite/lang/IgniteUuid.java | 25 +++-
...PlatformCachePluginConfigurationClosure.java | 31 +++++
...mCachePluginConfigurationClosureFactory.java | 37 ++++++
...atformCachePluginConfigurationClosureFactory | 1 +
.../binary/BinaryMarshallerSelfTest.java | 10 ++
.../cache/PlatformGetCachePluginsTask.java | 85 ++++++++++++++
.../PlatformTestCachePluginConfiguration.java | 60 ++++++++++
...formTestCachePluginConfigurationClosure.java | 48 ++++++++
...tCachePluginConfigurationClosureFactory.java | 37 ++++++
.../cache/PlatformTestCachePluginProvider.java | 73 ++++++++++++
.../cache/IgniteCacheAbstractQuerySelfTest.java | 97 ++++++++++++++++
.../Apache.Ignite.Core.Tests.csproj | 2 +
.../Cache/CacheJavaPluginConfiguration.cs | 45 ++++++++
.../Plugin/Cache/CacheJavaPluginTest.cs | 113 +++++++++++++++++++
.../Plugin/Cache/CachePluginConfiguration.cs | 24 ++++
.../Plugin/Cache/CachePluginTest.cs | 7 +-
.../Cache/Configuration/CacheConfiguration.cs | 22 +++-
.../Plugin/Cache/ICachePluginConfiguration.cs | 18 ++-
23 files changed, 822 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[2/5] ignite git commit: IGNITE-4711 Propagate platform cache plugin
configuration to Java
Posted by sb...@apache.org.
IGNITE-4711 Propagate platform cache plugin configuration to Java
This closes #1552
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab1b6850
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab1b6850
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab1b6850
Branch: refs/heads/ignite-4705
Commit: ab1b68507cae85eb21fd2e0479a8c9f3148fabc2
Parents: 1c9b024
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Feb 17 17:18:36 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Feb 17 17:18:36 2017 +0300
----------------------------------------------------------------------
.../utils/PlatformConfigurationUtils.java | 64 ++++++++++-
...PlatformCachePluginConfigurationClosure.java | 31 +++++
...mCachePluginConfigurationClosureFactory.java | 37 ++++++
...atformCachePluginConfigurationClosureFactory | 1 +
.../cache/PlatformGetCachePluginsTask.java | 85 ++++++++++++++
.../PlatformTestCachePluginConfiguration.java | 60 ++++++++++
...formTestCachePluginConfigurationClosure.java | 48 ++++++++
...tCachePluginConfigurationClosureFactory.java | 37 ++++++
.../cache/PlatformTestCachePluginProvider.java | 73 ++++++++++++
.../Apache.Ignite.Core.Tests.csproj | 2 +
.../Cache/CacheJavaPluginConfiguration.cs | 45 ++++++++
.../Plugin/Cache/CacheJavaPluginTest.cs | 113 +++++++++++++++++++
.../Plugin/Cache/CachePluginConfiguration.cs | 24 ++++
.../Plugin/Cache/CachePluginTest.cs | 7 +-
.../Cache/Configuration/CacheConfiguration.cs | 22 +++-
.../Plugin/Cache/ICachePluginConfiguration.cs | 18 ++-
16 files changed, 655 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index f295ff5..bce3735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -56,6 +56,8 @@ import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosure;
+import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory;
import org.apache.ignite.plugin.platform.PlatformPluginConfigurationClosure;
import org.apache.ignite.plugin.platform.PlatformPluginConfigurationClosureFactory;
import org.apache.ignite.spi.communication.CommunicationSpi;
@@ -80,6 +82,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -214,12 +217,23 @@ public class PlatformConfigurationUtils {
int pluginCnt = in.readInt();
if (pluginCnt > 0) {
- CachePluginConfiguration[] plugins = new CachePluginConfiguration[pluginCnt];
+ ArrayList<CachePluginConfiguration> plugins = new ArrayList<>();
+
+ for (int i = 0; i < pluginCnt; i++) {
+ if (in.readBoolean()) {
+ // Java cache plugin.
+ readCachePluginConfiguration(ccfg, in);
+ } else {
+ // Platform cache plugin.
+ plugins.add(new PlatformCachePluginConfiguration(in.readObjectDetached()));
+ }
+ }
- for (int i = 0; i < pluginCnt; i++)
- plugins[i] = new PlatformCachePluginConfiguration(in.readObjectDetached());
+ if (ccfg.getPluginConfigurations() != null) {
+ Collections.addAll(plugins, ccfg.getPluginConfigurations());
+ }
- ccfg.setPluginConfigurations(plugins);
+ ccfg.setPluginConfigurations(plugins.toArray(new CachePluginConfiguration[plugins.size()]));
}
return ccfg;
@@ -1325,6 +1339,48 @@ public class PlatformConfigurationUtils {
}
/**
+ * Reads the plugin configuration.
+ *
+ * @param cfg Ignite configuration to update.
+ * @param in Reader.
+ */
+ private static void readCachePluginConfiguration(CacheConfiguration cfg, BinaryRawReader in) {
+ int plugCfgFactoryId = in.readInt();
+
+ PlatformCachePluginConfigurationClosure plugCfg = cachePluginConfiguration(plugCfgFactoryId);
+
+ plugCfg.apply(cfg, in);
+ }
+
+ /**
+ * Create PlatformCachePluginConfigurationClosure for the given factory ID.
+ *
+ * @param factoryId Factory ID.
+ * @return PlatformCachePluginConfigurationClosure.
+ */
+ private static PlatformCachePluginConfigurationClosure cachePluginConfiguration(final int factoryId) {
+ PlatformCachePluginConfigurationClosureFactory factory = AccessController.doPrivileged(
+ new PrivilegedAction<PlatformCachePluginConfigurationClosureFactory>() {
+ @Override public PlatformCachePluginConfigurationClosureFactory run() {
+ for (PlatformCachePluginConfigurationClosureFactory factory :
+ ServiceLoader.load(PlatformCachePluginConfigurationClosureFactory.class)) {
+ if (factory.id() == factoryId)
+ return factory;
+ }
+
+ return null;
+ }
+ });
+
+ if (factory == null) {
+ throw new IgniteException("PlatformPluginConfigurationClosureFactory is not found " +
+ "(did you put into the classpath?): " + factoryId);
+ }
+
+ return factory.create();
+ }
+
+ /**
* Private constructor.
*/
private PlatformConfigurationUtils() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosure.java
new file mode 100644
index 0000000..b03d84b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosure.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.platform;
+
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteBiInClosure;
+
+/**
+ * Platform cache configuration handler:
+ * updates plugin configuration using data sent from platform code.
+ */
+public interface PlatformCachePluginConfigurationClosure
+ extends IgniteBiInClosure<CacheConfiguration, BinaryRawReader> {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosureFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosureFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosureFactory.java
new file mode 100644
index 0000000..d6398ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/platform/PlatformCachePluginConfigurationClosureFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.platform;
+
+/**
+ * Factory for @{@link PlatformCachePluginConfigurationClosure} with a unique id.
+ */
+public interface PlatformCachePluginConfigurationClosureFactory {
+ /**
+ * Gets the factory id.
+ *
+ * @return Factory id.
+ */
+ int id();
+
+ /**
+ * Creates configuration closure instance.
+ *
+ * @return Configuration closure instance.
+ */
+ PlatformCachePluginConfigurationClosure create();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory
new file mode 100644
index 0000000..a99b3b9
--- /dev/null
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory
@@ -0,0 +1 @@
+org.apache.ignite.platform.plugin.cache.PlatformTestCachePluginConfigurationClosureFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformGetCachePluginsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformGetCachePluginsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformGetCachePluginsTask.java
new file mode 100644
index 0000000..cfcf0d5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformGetCachePluginsTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.plugin.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to get a list of cache plugins.
+ */
+public class PlatformGetCachePluginsTask extends ComputeTaskAdapter<String, String[]> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable String arg) {
+ return Collections.singletonMap(new GetCachePluginsJob(arg), F.first(subgrid));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String[] reduce(List<ComputeJobResult> results) {
+ return results.get(0).getData();
+ }
+
+ /**
+ * Job.
+ */
+ @SuppressWarnings("unchecked")
+ private static class GetCachePluginsJob extends ComputeJobAdapter {
+ /** */
+ private final String cacheName;
+
+ /** */
+ @IgniteInstanceResource
+ private final Ignite ignite;
+
+ /** */
+ GetCachePluginsJob(String cacheName) {
+ this.cacheName = cacheName;
+ ignite = null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String[] execute() {
+ CachePluginConfiguration[] cfg =
+ ignite.cache(cacheName).getConfiguration(CacheConfiguration.class).getPluginConfigurations();
+
+ if (cfg == null)
+ return null;
+
+ String[] res = new String[cfg.length];
+
+ for (int i = 0; i < cfg.length; i++)
+ res[i] = cfg[i].getClass().getName();
+
+ return res;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfiguration.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfiguration.java
new file mode 100644
index 0000000..88a021a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.plugin.cache;
+
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+
+/**
+ * Test plugin configuration.
+ */
+public class PlatformTestCachePluginConfiguration implements CachePluginConfiguration {
+ /** */
+ private String pluginProperty;
+
+ /**
+ * Initializes a new instance of PlatformTestPluginConfiguration.
+ */
+ PlatformTestCachePluginConfiguration() {
+ // No-op.
+ }
+
+ /**
+ * Gets the plugin property.
+ *
+ * @return Plugin property.
+ */
+ public String pluginProperty() {
+ return pluginProperty;
+ }
+
+ /**
+ * Sets the plugin property.
+ *
+ * @param pluginProperty Value.
+ */
+ void setPluginProperty(String pluginProperty) {
+ this.pluginProperty = pluginProperty;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createProvider(CachePluginContext ctx) {
+ return new PlatformTestCachePluginProvider();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosure.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosure.java
new file mode 100644
index 0000000..2763c4f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosure.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.plugin.cache;
+
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosure;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Test config closure.
+ */
+public class PlatformTestCachePluginConfigurationClosure implements PlatformCachePluginConfigurationClosure {
+ /** {@inheritDoc} */
+ @Override public void apply(CacheConfiguration cacheConfiguration, BinaryRawReader reader) {
+ ArrayList<CachePluginConfiguration> cfgs = new ArrayList<>();
+
+ if (cacheConfiguration.getPluginConfigurations() != null) {
+ Collections.addAll(cfgs, cacheConfiguration.getPluginConfigurations());
+ }
+
+ PlatformTestCachePluginConfiguration plugCfg = new PlatformTestCachePluginConfiguration();
+
+ plugCfg.setPluginProperty(reader.readString());
+
+ cfgs.add(plugCfg);
+
+ cacheConfiguration.setPluginConfigurations(cfgs.toArray(new CachePluginConfiguration[cfgs.size()]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosureFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosureFactory.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosureFactory.java
new file mode 100644
index 0000000..85fa8e4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginConfigurationClosureFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.plugin.cache;
+
+import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosure;
+import org.apache.ignite.plugin.platform.PlatformCachePluginConfigurationClosureFactory;
+
+/**
+ * Test closure factory.
+ */
+public class PlatformTestCachePluginConfigurationClosureFactory
+ implements PlatformCachePluginConfigurationClosureFactory {
+ /** {@inheritDoc} */
+ @Override public int id() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformCachePluginConfigurationClosure create() {
+ return new PlatformTestCachePluginConfigurationClosure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginProvider.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginProvider.java
new file mode 100644
index 0000000..1dcd81f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/cache/PlatformTestCachePluginProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.plugin.cache;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.Cache;
+
+/**
+ * Test cache plugin provider.
+ */
+public class PlatformTestCachePluginProvider implements CachePluginProvider {
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validate() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateRemote(CacheConfiguration locCfg, CachePluginConfiguration locPluginCcfg,
+ CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object unwrapCacheEntry(Cache.Entry entry, Class cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object createComponent(Class cls) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 71b0593..0eb3e39 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -91,6 +91,8 @@
<Compile Include="Log\DefaultLoggerTest.cs" />
<Compile Include="Log\Log4NetLoggerTest.cs" />
<Compile Include="Log\NLogLoggerTest.cs" />
+ <Compile Include="Plugin\Cache\CacheJavaPluginConfiguration.cs" />
+ <Compile Include="Plugin\Cache\CacheJavaPluginTest.cs" />
<Compile Include="Plugin\PluginTest.cs" />
<Compile Include="Plugin\TestIgnitePlugin.cs" />
<Compile Include="Plugin\TestIgnitePluginConfiguration.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginConfiguration.cs
new file mode 100644
index 0000000..2db875d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginConfiguration.cs
@@ -0,0 +1,45 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Plugin.Cache
+{
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Plugin.Cache;
+
+ /// <summary>
+ /// Configuration with a plugin with Java part.
+ /// </summary>
+ public class CacheJavaPluginConfiguration : ICachePluginConfiguration
+ {
+ /// <summary>
+ /// Gets or sets the custom property.
+ /// </summary>
+ public string Foo { get; set; }
+
+ /** <inheritdoc /> */
+ public int? CachePluginConfigurationClosureFactoryId
+ {
+ get { return 0; }
+ }
+
+ /** <inheritdoc /> */
+ public void WriteBinary(IBinaryRawWriter writer)
+ {
+ writer.WriteString(Foo);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginTest.cs
new file mode 100644
index 0000000..56d2b90
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CacheJavaPluginTest.cs
@@ -0,0 +1,113 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Plugin.Cache
+{
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests the plugin with Java part.
+ /// </summary>
+ public class CacheJavaPluginTest
+ {
+ /** */
+ private const string CacheName = "staticCache";
+
+ /** */
+ private const string DynCacheName = "dynamicCache";
+
+ /** */
+ private const string GetPluginsTask = "org.apache.ignite.platform.plugin.cache.PlatformGetCachePluginsTask";
+
+ /** */
+ private const string PluginConfigurationClass =
+ "org.apache.ignite.platform.plugin.cache.PlatformTestCachePluginConfiguration";
+
+ /** */
+ private IIgnite _grid;
+
+ /// <summary>
+ /// Fixture set up.
+ /// </summary>
+ [TestFixtureSetUp]
+ public void FixtureSetUp()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ CacheConfiguration = new[]
+ {
+ new CacheConfiguration(CacheName)
+ {
+ PluginConfigurations = new[] {new CacheJavaPluginConfiguration()}
+ }
+ }
+ };
+
+ _grid = Ignition.Start(cfg);
+ }
+
+ /// <summary>
+ /// Fixture tear down.
+ /// </summary>
+ [TestFixtureTearDown]
+ public void FixtureTearDown()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Tests that cache plugin works with static cache.
+ /// </summary>
+ [Test]
+ public void TestStaticCache()
+ {
+ var cache = _grid.GetCache<int, int>(CacheName);
+
+ VerifyCachePlugin(cache);
+ }
+
+ /// <summary>
+ /// Tests that cache plugin works with static cache.
+ /// </summary>
+ [Test]
+ public void TestDynamicCache()
+ {
+ var cache = _grid.CreateCache<int, int>(new CacheConfiguration(DynCacheName)
+ {
+ PluginConfigurations = new[] {new CacheJavaPluginConfiguration()}
+ });
+
+ VerifyCachePlugin(cache);
+ }
+
+ /// <summary>
+ /// Verifies the cache plugin.
+ /// </summary>
+ private void VerifyCachePlugin(ICache<int, int> cache)
+ {
+ Assert.IsNull(cache.GetConfiguration().PluginConfigurations); // Java cache plugins are not returned.
+
+ cache[1] = 1;
+ Assert.AreEqual(1, cache[1]);
+
+ var plugins = _grid.GetCompute().ExecuteJavaTask<string[]>(GetPluginsTask, cache.Name);
+ Assert.AreEqual(new[] {PluginConfigurationClass}, plugins);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginConfiguration.cs
index 4627aa0..72220ff 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginConfiguration.cs
@@ -18,7 +18,9 @@
namespace Apache.Ignite.Core.Tests.Plugin.Cache
{
using System;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Plugin.Cache;
+ using NUnit.Framework;
/// <summary>
/// Cache plugin config.
@@ -35,6 +37,28 @@ namespace Apache.Ignite.Core.Tests.Plugin.Cache
/// <summary>
/// Gets or sets a value indicating whether the plugin should throw an error.
/// </summary>
+ // ReSharper disable once UnusedAutoPropertyAccessor.Global
public bool ThrowError { get; set; }
+
+ /// <summary>
+ /// Gets the id to locate PlatformCachePluginConfigurationClosureFactory on Java side
+ /// and read the data written by
+ /// <see cref="WriteBinary(IBinaryRawWriter)" /> method.
+ /// </summary>
+ public int? CachePluginConfigurationClosureFactoryId
+ {
+ get { return null; }
+ }
+
+ /// <summary>
+ /// Writes this instance to a raw writer.
+ /// This method will be called when <see cref="CachePluginConfigurationClosureFactoryId" />
+ /// is not null to propagate configuration to the Java side.
+ /// </summary>
+ /// <param name="writer">The writer.</param>
+ public void WriteBinary(IBinaryRawWriter writer)
+ {
+ Assert.Fail("Should not be called");
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginTest.cs
index 583d314..0fc8877 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/Cache/CachePluginTest.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Plugin.Cache
using System;
using System.Collections.Generic;
using System.Linq;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Plugin.Cache;
@@ -202,14 +203,16 @@ namespace Apache.Ignite.Core.Tests.Plugin.Cache
[CachePluginProviderType(typeof(CachePlugin))]
private class NonSerializableCachePluginConfig : ICachePluginConfiguration
{
- // No-op.
+ public int? CachePluginConfigurationClosureFactoryId { get { return null; } }
+ public void WriteBinary(IBinaryRawWriter writer) { /* No-op. */ }
}
[Serializable]
[CachePluginProviderType(typeof(string))]
private class ThrowCachePluginConfig : ICachePluginConfiguration
{
- // No-op.
+ public int? CachePluginConfigurationClosureFactoryId { get { return null; } }
+ public void WriteBinary(IBinaryRawWriter writer) { /* No-op. */ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
index 0e270dc..ebf412d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
@@ -375,11 +375,23 @@ namespace Apache.Ignite.Core.Cache.Configuration
throw new InvalidOperationException("Invalid cache configuration: " +
"ICachePluginConfiguration can't be null.");
- if (!cachePlugin.GetType().IsSerializable)
- throw new InvalidOperationException("Invalid cache configuration: " +
- "ICachePluginConfiguration should be Serializable.");
-
- writer.WriteObject(cachePlugin);
+ if (cachePlugin.CachePluginConfigurationClosureFactoryId != null)
+ {
+ writer.WriteBoolean(true);
+ writer.WriteInt(cachePlugin.CachePluginConfigurationClosureFactoryId.Value);
+ cachePlugin.WriteBinary(writer);
+ }
+ else
+ {
+ if (!cachePlugin.GetType().IsSerializable)
+ {
+ throw new InvalidOperationException("Invalid cache configuration: " +
+ "ICachePluginConfiguration should be Serializable.");
+ }
+
+ writer.WriteBoolean(false);
+ writer.WriteObject(cachePlugin);
+ }
}
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab1b6850/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/Cache/ICachePluginConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/Cache/ICachePluginConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/Cache/ICachePluginConfiguration.cs
index 5ea3b51..1790357 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/Cache/ICachePluginConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/Cache/ICachePluginConfiguration.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Plugin.Cache
{
using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache.Configuration;
/// <summary>
@@ -45,6 +46,21 @@ namespace Apache.Ignite.Core.Plugin.Cache
[SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")]
public interface ICachePluginConfiguration
{
- // No-op.
+ /// <summary>
+ /// Gets the id to locate PlatformCachePluginConfigurationClosureFactory on Java side
+ /// and read the data written by <see cref="WriteBinary"/> method.
+ /// <para />
+ /// When this property is not null, all cache plugin functionality is delegated to Java part.
+ /// <see cref="ICachePluginProvider{TConfig}"/> won't be invoked.
+ /// </summary>
+ int? CachePluginConfigurationClosureFactoryId { get; }
+
+ /// <summary>
+ /// Writes this instance to a raw writer.
+ /// This method will be called when <see cref="CachePluginConfigurationClosureFactoryId"/>
+ /// is not null to propagate configuration to the Java side.
+ /// </summary>
+ /// <param name="writer">The writer.</param>
+ void WriteBinary(IBinaryRawWriter writer);
}
}
[5/5] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b979880
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b979880
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b979880
Branch: refs/heads/ignite-4705
Commit: 7b979880cd1f987d9e707627339f3cb8890ceb30
Parents: 743f8b4
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 20 12:43:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 20 14:33:40 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 2 +
.../apache/ignite/internal/IgniteKernal.java | 1 +
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheIoManager.java | 3 -
.../processors/cache/GridCacheMessage.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 9 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 31 ++-
.../dht/atomic/GridDhtAtomicCache.java | 76 ++++++--
.../GridDhtAtomicDeferredUpdateResponse.java | 6 +
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 3 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 +-
.../GridNearAtomicAbstractUpdateFuture.java | 6 +-
.../atomic/GridNearAtomicMappingResponse.java | 193 +++++++++++++++++++
.../GridNearAtomicSingleUpdateFuture.java | 34 +++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 15 +-
15 files changed, 352 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 6636bf2..50876dd 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -196,6 +197,7 @@ public class MessageCodeGenerator {
gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+ gen.generateAndWrite(GridNearAtomicMappingResponse.class);
// gen.generateAndWrite(GridMessageCollection.class);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 750c316..9f01615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1151,6 +1151,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
" ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
+ " ^-- Futures [futs=" + ctx.cache().context().mvcc().atomicFuturesCount() + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
" ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5ed46ff..908d1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -175,6 +176,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -47:
+ msg = new GridNearAtomicMappingResponse();
+
+ break;
+
case -46:
msg = new UpdateErrors();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0f7371d..4d3ab1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -362,9 +362,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(true);
- if (!cacheMsg.partitionExchangeMessage())
- log.info("Cache message: " + cacheMsg);
-
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 3ec5323..f3acf66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
- public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6;
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e8a5c8d..7f9f18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -107,7 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
/** Pending atomic futures. */
- private final ConcurrentMap<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
/** Pending data streamer futures. */
private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -453,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Collection of pending atomic futures.
+ */
+ public int atomicFuturesCount() {
+ return atomicFuts.size();
+ }
+
+ /**
* @return Collection of pending data streamer futures.
*/
public Collection<DataStreamerFuture> dataStreamerFutures() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index da6616b..b512bdc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -81,7 +82,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
/** Update request. */
protected final GridNearAtomicAbstractUpdateRequest updateReq;
@@ -108,7 +109,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes) {
@@ -367,9 +368,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
List<UUID> dhtNodes = null;
if (fullSync) {
- dhtNodes = new ArrayList<>(mappings.size());
+ if (!F.isEmpty(mappings)) {
+ dhtNodes = new ArrayList<>(mappings.size());
- dhtNodes.addAll(mappings.keySet());
+ dhtNodes.addAll(mappings.keySet());
+ }
+ else
+ dhtNodes = Collections.emptyList();
if (primaryReplyToNear)
updateRes.mapping(dhtNodes);
@@ -380,9 +385,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (primaryReplyToNear)
completionCb.apply(updateReq, updateRes);
+ else {
+ if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) {
+ GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse(
+ cctx.cacheId(),
+ updateReq.partition(),
+ updateReq.futureId(),
+ dhtNodes);
+
+ try {
+ cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send mapping response [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + updateRes.nodeId() + ']');
+ }
+ }
+ }
}
else {
- // Reply.
completionCb.apply(updateReq, updateRes);
onDone();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d402c86..73a5acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -35,6 +35,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -110,7 +111,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -137,9 +137,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
+ /** */
+ static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR = IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
+
/** Update reply closure. */
@GridToStringExclude
- private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+ private UpdateReplyClosure updateReplyClos;
/** Pending */
private GridDeferredAckMessageSender deferredUpdateMsgSnd;
@@ -208,7 +211,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override protected void init() {
super.init();
- updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new UpdateReplyClosure() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (req.writeSynchronizationMode() != FULL_ASYNC)
@@ -231,6 +234,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
+ // TODO IGNITE-4705.
+ log.info("Atomic cache start [name=" + name() +
+ ", mode=" + configuration().getWriteSynchronizationMode() +
+ ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR + ']');
+
deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures()) {
@Override public int getTimeout() {
return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
@@ -423,6 +431,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(),
+ GridNearAtomicMappingResponse.class,
+ new CI2<UUID, GridNearAtomicMappingResponse>() {
+ @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg) {
+ processDhtAtomicNearMappingResponse(uuid, msg);
+ }
+
+ @Override public String toString() {
+ return "GridNearAtomicMappingResponse handler " +
+ "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']';
+ }
+ });
+
if (near == null) {
ctx.io().addHandler(
ctx.cacheId(),
@@ -1697,10 +1718,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Update request.
* @param completionCb Completion callback.
*/
- public void updateAllAsyncInternal(
+ void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final UpdateReplyClosure completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1749,7 +1770,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void onForceKeysError(final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final UpdateReplyClosure completionCb,
IgniteCheckedException e
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
@@ -1772,7 +1793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ UpdateReplyClosure completionCb
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
ctx.deploymentEnabled());
@@ -1997,7 +2018,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final GridDhtAtomicCache.UpdateReplyClosure completionCb,
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2420,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2651,7 +2672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final Collection<KeyCacheObject> rmvKeys,
@Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final GridDhtAtomicCache.UpdateReplyClosure completionCb,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
final boolean replicate,
@@ -3133,7 +3154,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
boolean force
) {
if (updateReq.size() == 1)
@@ -3370,6 +3391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * @param primaryId Primary node ID.
* @param req Request.
* @param nearRes Response to send.
*/
@@ -3424,6 +3446,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Node ID.
* @param res Response.
*/
+ private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse res) {
+ GridNearAtomicAbstractUpdateFuture updateFut =
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+
+ if (updateFut != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Received near mapping response [futId=" + res.futureId() +
+ ", node=" + nodeId + ']');
+ }
+
+ updateFut.onMappingReceived(nodeId, res);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to find future for near mapping response [futId=" + res.futureId() +
+ ", node=" + nodeId +
+ ", res=" + res + ']');
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
GridNearAtomicAbstractUpdateFuture updateFut =
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
@@ -3663,4 +3710,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return Collections.emptyList();
}
}
+
+ /**
+ *
+ */
+ static interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index bd2bae0..671d516 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -147,4 +148,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
@Override public byte fieldsCount() {
return 4;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 4ee90a0..1e63165 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,7 +57,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
*/
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 20c3d4f..dece1d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,7 +60,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
*/
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 82f171d..15075c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -197,7 +197,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/**
* Performs future mapping.
*/
- public void map() {
+ public final void map() {
AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
@@ -257,7 +257,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ new GridDhtAtomicCache.UpdateReplyClosure() {
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res, false);
}
@@ -300,6 +300,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res);
+ public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
+
/**
* @param req Request.
* @param e Error.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
new file mode 100644
index 0000000..be1f0f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicMappingResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private int part;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
+ /** */
+ private long futId;
+
+ /**
+ *
+ */
+ public GridNearAtomicMappingResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param part Partition.
+ * @param futId Future ID.
+ * @param mapping Mapping.
+ */
+ GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping) {
+ assert part >= 0 : part;
+
+ this.cacheId = cacheId;
+ this.part = part;
+ this.futId = futId;
+ this.mapping = mapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
+ /**
+ * @return Mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -47;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicMappingResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicMappingResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 2016c98..8bfbe72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -217,6 +217,29 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (cctx.discovery().node(dhtNodeId) != null)
mapping.add(dhtNodeId);
}
+
+ if (rcvd != null)
+ mapping.removeAll(rcvd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
+ GridCacheReturn opRes0 = null;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ if (mapping == null) {
+ initMapping(res.mapping());
+
+ if (mapping.isEmpty() && opRes != null)
+ opRes0 = opRes;
+ }
+ }
+
+ if (opRes0 != null)
+ onDone(opRes0);
}
/** {@inheritDoc} */
@@ -251,11 +274,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (opRes == null && res.hasResult())
opRes = res.result();
- if (mapping.isEmpty() && opRes != null) {
+ if (mapping.isEmpty() && opRes != null)
opRes0 = opRes;
-
- futId = null;
- }
}
if (opRes0 != null)
@@ -324,12 +344,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
else
opRes = ret;
- if (res.mapping() != null) {
+ if (res.mapping() != null)
initMapping(res.mapping());
-
- if (rcvd != null)
- mapping.removeAll(rcvd);
- }
else
mapping = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7b573b1..e135000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
+ @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
throw new UnsupportedOperationException();
}
@@ -591,8 +596,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new GridDhtAtomicCache.UpdateReplyClosure() {
+ @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res, false);
}
});
@@ -615,6 +620,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
void map(AffinityTopologyVersion topVer,
Long futId,
@Nullable Collection<KeyCacheObject> remapKeys) {
+ if (true) {
+ onDone(new IgniteCheckedException("Failed"));
+
+ return;
+ }
+
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {