You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/24 04:41:02 UTC
[4/4] ignite git commit: Merge branches 'ignite-843' and 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
Merge branches 'ignite-843' and 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b9cd1844
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b9cd1844
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b9cd1844
Branch: refs/heads/ignite-843
Commit: b9cd18447c7636bbaa4fe0f78fb05e73f56b01e1
Parents: 6b7593d
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Aug 24 09:41:02 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Aug 24 09:41:04 2015 +0700
----------------------------------------------------------------------
examples/config/example-cache.xml | 2 -
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../store/jdbc/CacheAbstractJdbcStore.java | 45 +-
.../cache/store/jdbc/CacheJdbcPojoStore.java | 32 +-
.../store/jdbc/dialect/BasicJdbcDialect.java | 3 +
.../cache/store/jdbc/dialect/DB2Dialect.java | 3 +
.../cache/store/jdbc/dialect/H2Dialect.java | 3 +
.../cache/store/jdbc/dialect/JdbcDialect.java | 3 +-
.../cache/store/jdbc/dialect/MySQLDialect.java | 3 +
.../cache/store/jdbc/dialect/OracleDialect.java | 3 +
.../store/jdbc/dialect/SQLServerDialect.java | 3 +
.../cluster/ClusterTopologyException.java | 18 +
.../ignite/internal/MarshallerContextImpl.java | 24 +-
.../ClusterTopologyCheckedException.java | 18 +
.../CachePartialUpdateCheckedException.java | 11 +-
.../processors/cache/GridCacheAdapter.java | 85 ++-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 1 -
.../cache/GridCacheSharedContext.java | 17 +
.../processors/cache/GridCacheUtils.java | 23 +
.../distributed/GridDistributedCacheEntry.java | 11 +-
.../dht/GridClientPartitionTopology.java | 20 +
.../distributed/dht/GridDhtCacheAdapter.java | 12 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 12 +-
.../dht/GridDhtPartitionTopology.java | 7 +
.../dht/GridDhtPartitionTopologyImpl.java | 20 +
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 182 ++++++-
.../dht/GridDhtTxPrepareResponse.java | 42 +-
.../dht/GridPartitionedGetFuture.java | 104 ++--
.../dht/atomic/GridDhtAtomicCache.java | 16 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 3 +
.../dht/colocated/GridDhtColocatedCache.java | 19 +-
.../colocated/GridDhtColocatedLockFuture.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 40 +-
.../distributed/near/GridNearAtomicCache.java | 6 +-
.../distributed/near/GridNearCacheAdapter.java | 15 +-
.../distributed/near/GridNearCacheEntry.java | 10 +-
.../distributed/near/GridNearGetFuture.java | 120 +++--
.../distributed/near/GridNearLockFuture.java | 12 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 13 +-
.../GridNearPessimisticTxPrepareFuture.java | 9 +-
.../near/GridNearTransactionalCache.java | 9 +-
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../near/GridNearTxPrepareResponse.java | 3 -
.../cache/local/GridLocalCacheEntry.java | 4 +-
.../local/atomic/GridLocalAtomicCache.java | 17 +-
.../cache/transactions/IgniteInternalTx.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 19 +-
.../cache/transactions/IgniteTxEntry.java | 18 +
.../cache/transactions/IgniteTxHandler.java | 5 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../service/GridServiceProcessor.java | 5 +
.../ignite/internal/util/IgniteUtils.java | 10 +-
.../ignite/internal/util/lang/GridFunc.java | 14 +
.../TcpDiscoveryMulticastIpFinder.java | 38 ++
.../config/store/jdbc/ignite-type-metadata.xml | 8 +
.../store/jdbc/CacheJdbcPojoStoreTest.java | 33 +-
...eJdbcStoreAbstractMultithreadedSelfTest.java | 16 +-
.../ignite/cache/store/jdbc/model/Person.java | 26 +-
.../cache/CrossCacheTxRandomOperationsTest.java | 534 +++++++++++++++++++
...teAtomicCacheEntryProcessorNodeJoinTest.java | 32 ++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 225 ++++++++
.../IgniteCacheTopologySafeGetSelfTest.java | 218 ++++++++
.../GridCacheMultiNodeLockAbstractTest.java | 41 +-
.../GridCacheTransformEventSelfTest.java | 2 +
.../IgniteCacheCrossCacheTxFailoverTest.java | 433 +++++++++++++++
.../IgniteCachePutRetryAbstractSelfTest.java | 1 +
...gniteCachePutRetryTransactionalSelfTest.java | 187 +++++++
.../near/GridCacheNearOnlyTopologySelfTest.java | 4 +-
.../near/GridCacheNearTxForceKeyTest.java | 76 +++
...idCachePartitionedHitsAndMissesSelfTest.java | 20 +-
...idCachePartitionedMultiNodeLockSelfTest.java | 8 +-
...ridCacheReplicatedMultiNodeLockSelfTest.java | 8 +-
.../lru/LruNearEvictionPolicySelfTest.java | 29 +-
.../LruNearOnlyNearEvictionPolicySelfTest.java | 55 +-
.../OptimizedMarshallerNodeFailoverTest.java | 97 +++-
.../IgniteCacheFailoverTestSuite.java | 2 +
.../IgniteCacheFailoverTestSuite2.java | 2 +
.../testsuites/IgniteCacheTestSuite2.java | 5 +
.../ignite/schema/parser/DbMetadataReader.java | 41 +-
.../parser/dialect/DB2MetadataDialect.java | 3 +-
.../parser/dialect/DatabaseMetadataDialect.java | 13 +-
.../parser/dialect/JdbcMetadataDialect.java | 129 +++--
.../parser/dialect/MySQLMetadataDialect.java | 57 ++
.../parser/dialect/OracleMetadataDialect.java | 111 ++--
.../ignite/schema/model/PojoDescriptor.java | 6 +-
.../ignite/schema/model/SchemaDescriptor.java | 61 +++
.../schema/parser/DatabaseMetadataParser.java | 26 +-
.../org/apache/ignite/schema/ui/Controls.java | 25 +-
.../ignite/schema/ui/SchemaImportApp.java | 157 +++++-
.../schema/test/AbstractSchemaImportTest.java | 10 +-
.../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 14 +-
93 files changed, 3437 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/examples/config/example-cache.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml
index 6d1d0da..98e1a71 100644
--- a/examples/config/example-cache.xml
+++ b/examples/config/example-cache.xml
@@ -37,8 +37,6 @@
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="localHost" value="127.0.0.1" />
-
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache example configuration (Atomic mode). -->
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7e96b29..7c808df 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -354,6 +354,9 @@ public final class IgniteSystemProperties {
/** Number of cache operation retries in case of topology exceptions. */
public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
+ /** Number of times pending cache objects will be dumped to the log in case of partition exchange timeout. */
+ public static final String IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD = "IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index b1e223b..b2be8c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -355,6 +355,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @throws SQLException If a database access error occurs or this method is called.
*/
protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException {
+ Object val = rs.getObject(colIdx);
+
+ if (val == null)
+ return null;
+
if (type == int.class)
return rs.getInt(colIdx);
@@ -364,7 +369,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (type == double.class)
return rs.getDouble(colIdx);
- if (type == boolean.class)
+ if (type == boolean.class || type == Boolean.class)
return rs.getBoolean(colIdx);
if (type == byte.class)
@@ -378,31 +383,23 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (type == Integer.class || type == Long.class || type == Double.class ||
type == Byte.class || type == Short.class || type == Float.class) {
- Object val = rs.getObject(colIdx);
-
- if (val != null) {
- Number num = (Number)val;
-
- if (type == Integer.class)
- return num.intValue();
- else if (type == Long.class)
- return num.longValue();
- else if (type == Double.class)
- return num.doubleValue();
- else if (type == Byte.class)
- return num.byteValue();
- else if (type == Short.class)
- return num.shortValue();
- else if (type == Float.class)
- return num.floatValue();
- }
- else
- return EMPTY_COLUMN_VALUE;
+ Number num = (Number)val;
+
+ if (type == Integer.class)
+ return num.intValue();
+ else if (type == Long.class)
+ return num.longValue();
+ else if (type == Double.class)
+ return num.doubleValue();
+ else if (type == Byte.class)
+ return num.byteValue();
+ else if (type == Short.class)
+ return num.shortValue();
+ else if (type == Float.class)
+ return num.floatValue();
}
- Object val = rs.getObject(colIdx);
-
- if (type == UUID.class && val != null) {
+ if (type == UUID.class) {
if (val instanceof UUID)
return val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index 7b78bda..1ff170e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -99,8 +99,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
getters.put(field.getJavaName(), cls.getMethod("is" + prop));
}
catch (NoSuchMethodException e) {
- throw new CacheException("Failed to find getter in POJO class [class name=" + clsName +
- ", property=" + field.getJavaName() + "]", e);
+ throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName +
+ ", prop=" + field.getJavaName() + "]", e);
}
}
@@ -108,8 +108,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
}
catch (NoSuchMethodException e) {
- throw new CacheException("Failed to find setter in POJO class [class name=" + clsName +
- ", property=" + field.getJavaName() + "]", e);
+ throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName +
+ ", prop=" + field.getJavaName() + "]", e);
}
}
}
@@ -167,15 +167,25 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
Object obj = mc.ctor.newInstance();
for (CacheTypeFieldMetadata field : fields) {
- Method setter = mc.setters.get(field.getJavaName());
+ String fldJavaName = field.getJavaName();
+
+ Method setter = mc.setters.get(fldJavaName);
if (setter == null)
- throw new CacheLoaderException("Failed to find setter in POJO class [class name=" + typeName +
- ", property=" + field.getJavaName() + "]");
+ throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
+ ", prop=" + fldJavaName + "]");
+
+ String fldDbName = field.getDatabaseName();
- Integer colIdx = loadColIdxs.get(field.getDatabaseName());
+ Integer colIdx = loadColIdxs.get(fldDbName);
- setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+ try {
+ setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+ }
+ catch (Exception e) {
+ throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
+ ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+ }
}
return (R)obj;
@@ -204,8 +214,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
Method getter = mc.getters.get(fieldName);
if (getter == null)
- throw new CacheLoaderException("Failed to find getter in POJO class [class name=" + typeName +
- ", property=" + fieldName + "]");
+ throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
+ ", prop=" + fieldName + "]");
return getter.invoke(obj);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index d0dd6f4..b43c7d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -26,6 +26,9 @@ import java.util.*;
* Basic implementation of dialect based on JDBC specification.
*/
public class BasicJdbcDialect implements JdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Default max query parameters count. */
protected static final int DFLT_MAX_PARAMS_CNT = 2000;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
index fe1d876..2a08557 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
@@ -25,6 +25,9 @@ import java.util.*;
* A dialect compatible with the IBM DB2 database.
*/
public class DB2Dialect extends BasicJdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
index a97e144..8091e1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
@@ -25,6 +25,9 @@ import java.util.*;
* A dialect compatible with the H2 database.
*/
public class H2Dialect extends BasicJdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
index be1cc67..32adcc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
@@ -17,12 +17,13 @@
package org.apache.ignite.cache.store.jdbc.dialect;
+import java.io.*;
import java.util.*;
/**
* Represents a dialect of SQL implemented by a particular RDBMS.
*/
-public interface JdbcDialect {
+public interface JdbcDialect extends Serializable {
/**
* Construct select count query.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index df16841..def2fe7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
* A dialect compatible with the MySQL database.
*/
public class MySQLDialect extends BasicJdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
index 351f10a..e155fb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
* A dialect compatible with the Oracle database.
*/
public class OracleDialect extends BasicJdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
index e781e98..7fdda6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
* A dialect compatible with the Microsoft SQL Server database.
*/
public class SQLServerDialect extends BasicJdbcDialect {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
index d28c409..61bc367 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
/**
@@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException {
/** */
private static final long serialVersionUID = 0L;
+ /** Retry ready future. */
+ private transient IgniteFuture<?> readyFut;
+
/**
* Creates new topology exception with given error message.
*
@@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException {
public ClusterTopologyException(String msg, @Nullable Throwable cause) {
super(msg, cause);
}
+
+ /**
+ * @return Retry ready future.
+ */
+ public IgniteFuture<?> retryReadyFuture() {
+ return readyFut;
+ }
+
+ /**
+ * @param readyFut Retry ready future.
+ */
+ public void retryReadyFuture(IgniteFuture<?> readyFut) {
+ this.readyFut = readyFut;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 9f7c983..dc0fd57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.*;
@@ -135,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping).");
}
- String clsName = cache0.get(id);
+ String clsName = cache0.getTopologySafe(id);
if (clsName == null) {
File file = new File(workDir, id + ".classname");
@@ -177,18 +178,21 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends Integer, ? extends String> evt : events) {
- assert evt.getOldValue() == null : "Received non-null old value for system marshaller cache: " + evt;
+ assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()):
+ "Received cache entry update for system marshaller cache: " + evt;
- File file = new File(workDir, evt.getKey() + ".classname");
+ if (evt.getOldValue() == null) {
+ File file = new File(workDir, evt.getKey() + ".classname");
- try (Writer writer = new FileWriter(file)) {
- writer.write(evt.getValue());
+ try (Writer writer = new FileWriter(file)) {
+ writer.write(evt.getValue());
- writer.flush();
- }
- catch (IOException e) {
- U.error(log, "Failed to write class name to file [id=" + evt.getKey() +
- ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e);
+ writer.flush();
+ }
+ catch (IOException e) {
+ U.error(log, "Failed to write class name to file [id=" + evt.getKey() +
+ ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
index 8f985b4..2d7b0de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
import org.jetbrains.annotations.*;
/**
@@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
+ /** Next topology version to wait. */
+ private transient IgniteInternalFuture<?> readyFut;
+
/**
* Creates new topology exception with given error message.
*
@@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) {
super(msg, cause);
}
+
+ /**
+ * @return Retry ready future.
+ */
+ public IgniteInternalFuture<?> retryReadyFuture() {
+ return readyFut;
+ }
+
+ /**
+ * @param readyFut Retry ready future.
+ */
+ public void retryReadyFuture(IgniteInternalFuture<?> readyFut) {
+ this.readyFut = readyFut;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index c2259df..ab38e5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -47,8 +47,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* Gets collection of failed keys.
* @return Collection of failed keys.
*/
- public <K> Collection<K> failedKeys() {
- return (Collection<K>)failedKeys;
+ @SuppressWarnings("unchecked")
+ public synchronized <K> Collection<K> failedKeys() {
+ return new LinkedHashSet<>((Collection<K>)failedKeys);
}
/**
@@ -56,7 +57,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* @param err Error.
* @param topVer Topology version for failed update.
*/
- public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+ public synchronized void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
if (topVer != null) {
AffinityTopologyVersion topVer0 = this.topVer;
@@ -72,7 +73,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
/**
* @return Topology version.
*/
- public AffinityTopologyVersion topologyVersion() {
+ public synchronized AffinityTopologyVersion topologyVersion() {
return topVer;
}
@@ -80,7 +81,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* @param failedKeys Failed keys.
* @param err Error.
*/
- public void add(Collection<?> failedKeys, Throwable err) {
+ public synchronized void add(Collection<?> failedKeys, Throwable err) {
add(failedKeys, err, null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 177dcfb..7adea2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -526,7 +526,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
- /*skip values*/true
+ /*skip values*/true,
+ /*can remap*/true
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> map = fut.get();
@@ -560,7 +561,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
- /*skip values*/true
+ /*skip values*/true,
+ /*can remap*/true
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> kvMap = fut.get();
@@ -894,7 +896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet() {
- return entrySet((CacheEntryPredicate[]) null);
+ return entrySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -919,12 +921,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return primaryKeySet((CacheEntryPredicate[]) null);
+ return primaryKeySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Collection<V> values() {
- return values((CacheEntryPredicate[]) null);
+ return values((CacheEntryPredicate[])null);
}
/**
@@ -1210,22 +1212,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public V getForcePrimary(K key) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
- return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false)
- .get().get(key);
+ return getAllAsync(
+ F.asList(key),
+ /*force primary*/true,
+ /*skip tx*/false,
+ /*cached entry*/null,
+ /*subject id*/null,
+ taskName,
+ /*deserialize cache objects*/true,
+ /*skip values*/false,
+ /*can remap*/true
+ ).get().get(key);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
String taskName = ctx.kernalContext().job().currentTaskName();
- return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
- taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+ return getAllAsync(
+ Collections.singletonList(key),
+ /*force primary*/true,
+ /*skip tx*/false,
+ null,
+ null,
+ taskName,
+ true,
+ false,
+ /*can remap*/true
+ ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
}
+ /**
+ * Gets value without waiting for toplogy changes.
+ *
+ * @param key Key.
+ * @return Value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public V getTopologySafe(K key) throws IgniteCheckedException {
+ String taskName = ctx.kernalContext().job().currentTaskName();
+
+ return getAllAsync(
+ F.asList(key),
+ /*force primary*/false,
+ /*skip tx*/false,
+ /*cached entry*/null,
+ /*subject id*/null,
+ taskName,
+ /*deserialize cache objects*/true,
+ /*skip values*/false,
+ /*can remap*/false
+ ).get().get(key);
+ }
+
/** {@inheritDoc} */
@Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
return getAllOutTxAsync(keys).get();
@@ -1242,7 +1285,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
null,
taskName,
!ctx.keepPortable(),
- false);
+ /*skip values*/false,
+ /*can remap*/true);
}
/**
@@ -1577,7 +1621,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1592,7 +1637,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
deserializePortable,
forcePrimary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
- skipVals);
+ skipVals,
+ canRemap);
}
/**
@@ -1618,7 +1664,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean deserializePortable,
final boolean forcePrimary,
@Nullable IgniteCacheExpiryPolicy expiry,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -1633,7 +1680,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
deserializePortable,
expiry,
skipVals,
- false);
+ false,
+ canRemap);
}
/**
@@ -1656,7 +1704,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
- final boolean keepCacheObjects
+ final boolean keepCacheObjects,
+ boolean canRemap
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1679,7 +1728,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert keys != null;
final AffinityTopologyVersion topVer = tx == null
- ? ctx.affinity().affinityTopologyVersion()
+ ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
: tx.topologyVersion();
final Map<K1, V1> map = new GridLeanMap<>(keys.size());
@@ -4465,7 +4514,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
null,
taskName,
deserializePortable,
- false);
+ false,
+ /*can remap*/true);
}
/**
@@ -4693,6 +4743,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ *
*/
public void execute() {
tx = ctx.tm().newTx(
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 55669a7..fd71ab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -134,7 +134,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
memoryMode = cctx.config().getMemoryMode();
- plcEnabled = plc != null && memoryMode != OFFHEAP_TIERED;
+ plcEnabled = plc != null && (cctx.isNear() || memoryMode != OFFHEAP_TIERED);
filter = cfg.getEvictionFilter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..0ef190e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.version(),
null,
null,
- null,
null);
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 4075d79..cc661d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -531,6 +531,23 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * Gets ready future for the next affinity topology version (used in cases when a node leaves grid).
+ *
+ * @param curVer Current topology version (before a node left grid).
+ * @return Ready future.
+ */
+ public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) {
+ if (curVer == null)
+ return null;
+
+ AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1);
+
+ IgniteInternalFuture<?> fut = exchMgr.affinityReadyFuture(nextVer);
+
+ return fut == null ? new GridFinishedFuture<>() : fut;
+ }
+
+ /**
* @param tx Transaction to check.
* @param activeCacheIds Active cache IDs.
* @param cacheCtx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a313e3d..bf55f59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1674,6 +1674,29 @@ public class GridCacheUtils {
}
/**
+ * @param partsMap Cache ID to partition IDs collection map.
+ * @return Cache ID to partition ID array map.
+ */
+ public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+ Map<Integer, int[]> res = new HashMap<>(partsMap.size());
+
+ for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
+ Set<Integer> parts = entry.getValue();
+
+ int[] partsArray = new int[parts.size()];
+
+ int idx = 0;
+
+ for (Integer part : parts)
+ partsArray[idx++] = part;
+
+ res.put(entry.getKey(), partsArray);
+ }
+
+ return res;
+ }
+
+ /**
* Stops store session listeners.
*
* @param ctx Kernal context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index bd72764..59d75be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -68,6 +69,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
*
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param topVer Topology version.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
@@ -78,6 +80,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
@Nullable public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -105,6 +108,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
cand = mvcc.addLocal(this, threadId, ver, timeout, reenter, tx, implicitSingle);
+ if (cand != null)
+ cand.topologyVersion(topVer);
+
owner = mvcc.anyOwner();
boolean emptyAfter = mvcc.isEmpty();
@@ -732,6 +738,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
return addLocal(
tx.threadId(),
tx.xidVersion(),
+ tx.topologyVersion(),
timeout,
false,
true,
@@ -828,8 +835,10 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
// Allow next lock in the thread to proceed.
if (!cand.used()) {
+ GridCacheContext cctx0 = cand.parent().context();
+
GridDistributedCacheEntry e =
- (GridDistributedCacheEntry)cctx.cache().peekEx(cand.key());
+ (GridDistributedCacheEntry)cctx0.cache().peekEx(cand.key());
if (e != null)
e.recheck();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 22a5287..49fbc5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -539,7 +539,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -552,7 +553,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
deserializePortable,
forcePrimary,
null,
- skipVals);
+ skipVals,
+ canRemap);
}
/**
@@ -570,7 +572,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Nullable UUID subjId,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
return getAllAsync0(keys,
readThrough,
@@ -580,7 +583,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
false,
expiry,
skipVals,
- /*keep cache objects*/true);
+ /*keep cache objects*/true,
+ canRemap);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 742fbfe..9005541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -349,12 +349,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
}
else {
if (tx == null) {
- fut = cache().getDhtAllAsync(keys.keySet(),
+ fut = cache().getDhtAllAsync(
+ keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
}
else {
fut = tx.getAllAsync(cctx,
@@ -387,12 +389,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
}
else {
if (tx == null) {
- return cache().getDhtAllAsync(keys.keySet(),
+ return cache().getDhtAllAsync(
+ keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
}
else {
return tx.getAllAsync(cctx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
public GridDhtPartitionMap localPartitionMap();
/**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ * @return Partition state.
+ */
+ public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+ /**
* @return Current update sequence.
*/
public long updateSequence();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index facf329..8973dcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -638,6 +638,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @return Future that will be completed when locks are acquired.
*/
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
- @Nullable Iterable<IgniteTxEntry> reads,
- @Nullable Iterable<IgniteTxEntry> writes,
+ @Nullable Collection<IgniteTxEntry> reads,
+ @Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
IgniteUuid nearMiniId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 9bd5de2..2071f8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ /** Force keys future for correct transforms. */
+ private IgniteInternalFuture<?> forceKeysFut;
+
/** Locks ready flag. */
private volatile boolean locksReady;
@@ -291,14 +294,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+ if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
+ CacheObject val;
+
cached.unswap(retVal);
boolean readThrough = (retVal || hasFilters) &&
cacheCtx.config().isLoadPreviousValue() &&
!txEntry.skipStore();
- CacheObject val = cached.innerGet(
+ val = cached.innerGet(
tx,
/*swap*/true,
readThrough,
@@ -312,10 +317,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
null,
null);
- if (retVal) {
+ if (retVal || txEntry.op() == TRANSFORM) {
if (!F.isEmpty(txEntry.entryProcessors())) {
invoke = true;
+ if (txEntry.hasValue())
+ val = txEntry.value();
+
KeyCacheObject key = txEntry.key();
Object procRes = null;
@@ -339,12 +347,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
- if (err != null || procRes != null)
- ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
- else
- ret.invokeResult(true);
+ txEntry.entryProcessorCalculatedValue(val);
+
+ if (retVal) {
+ if (err != null || procRes != null)
+ ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
+ else
+ ret.invokeResult(true);
+ }
}
- else
+ else if (retVal)
ret.value(cacheCtx, val);
}
@@ -362,7 +374,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
ret.success(false);
}
else
- ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+ ret.success(txEntry.op() != DELETE || cached.hasValue());
}
}
catch (IgniteCheckedException e) {
@@ -466,7 +478,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private boolean mapIfLocked() {
if (checkLocks()) {
- prepare0();
+ if (!mapped.compareAndSet(false, true))
+ return false;
+
+ if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+ prepare0();
+ else {
+ forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ f.get();
+
+ prepare0();
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+ });
+ }
return true;
}
@@ -574,13 +604,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
// Send reply back to originating near node.
Throwable prepErr = err.get();
+ assert F.isEmpty(tx.invalidPartitions());
+
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId == null ? tx.xid() : nearMiniId,
tx.xidVersion(),
tx.writeVersion(),
- tx.invalidPartitions(),
ret,
prepErr,
null);
@@ -708,7 +739,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
*/
- public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+ @SuppressWarnings("TypeMayBeWeakened")
+ public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes) {
if (tx.empty()) {
tx.setRollbackOnly();
@@ -720,6 +752,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
this.writes = writes;
this.txNodes = txNodes;
+ if (!F.isEmpty(writes)) {
+ Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+ for (IgniteTxEntry entry : writes)
+ forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+ forceKeysFut = forceRebalanceKeys(forceKeys);
+ }
+
readyLocks();
mapIfLocked();
@@ -734,12 +775,79 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+ * required key or will create new map if passed in map is {@code null}.
*
+ * @param e TX entry.
+ * @param map Map with needed preload keys.
+ * @return Map if it was created.
*/
- private void prepare0() {
- if (!mapped.compareAndSet(false, true))
- return;
+ private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+ IgniteTxEntry e,
+ Map<Integer, Collection<KeyCacheObject>> map
+ ) {
+ if (retVal || !F.isEmpty(e.entryProcessors())) {
+ if (map == null)
+ map = new HashMap<>();
+
+ Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+ if (keys == null) {
+ keys = new ArrayList<>();
+
+ map.put(e.cacheId(), keys);
+ }
+
+ keys.add(e.key());
+ }
+
+ return map;
+ }
+
+ /**
+ * @param keysMap Keys to request.
+ * @return Keys request future.
+ */
+ private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+ if (F.isEmpty(keysMap))
+ return null;
+
+ GridCompoundFuture<Object, Object> compFut = null;
+ IgniteInternalFuture<Object> lastForceFut = null;
+
+ for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+ if (lastForceFut != null && compFut == null) {
+ compFut = new GridCompoundFuture();
+
+ compFut.add(lastForceFut);
+ }
+
+ int cacheId = entry.getKey();
+ Collection<KeyCacheObject> keys = entry.getValue();
+
+ lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+ if (compFut != null)
+ compFut.add(lastForceFut);
+ }
+
+ if (compFut != null) {
+ compFut.markInitialized();
+
+ return compFut;
+ }
+ else {
+ assert lastForceFut != null;
+
+ return lastForceFut;
+ }
+ }
+
+ /**
+ *
+ */
+ private void prepare0() {
try {
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
@@ -956,7 +1064,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private boolean map(
IgniteTxEntry entry,
Map<UUID, GridDistributedTxMapping> futDhtMap,
- Map<UUID, GridDistributedTxMapping> futNearMap) {
+ Map<UUID, GridDistributedTxMapping> futNearMap
+ ) {
if (entry.cached().isLocal())
return false;
@@ -1023,14 +1132,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param locMap Exclude map.
* @return {@code True} if mapped.
*/
- private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
- Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+ private boolean map(
+ IgniteTxEntry entry,
+ Iterable<ClusterNode> nodes,
+ Map<UUID, GridDistributedTxMapping> globalMap,
+ Map<UUID, GridDistributedTxMapping> locMap
+ ) {
boolean ret = false;
if (nodes != null) {
for (ClusterNode n : nodes) {
GridDistributedTxMapping global = globalMap.get(n.id());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
+
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+ entry.op(procVal == null ? DELETE : UPDATE);
+ entry.value(procVal, true, false);
+ entry.entryProcessors(null);
+ }
+ }
+
if (global == null)
globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
@@ -1194,6 +1320,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
// Process invalid partitions (no need to remap).
+ // Keep this loop for backward compatibility.
if (!F.isEmpty(res.invalidPartitions())) {
for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
IgniteTxEntry entry = it.next();
@@ -1206,6 +1333,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
}
}
+ }
+
+ // Process invalid partitions (no need to remap).
+ if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
+ Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+
+ for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
+ IgniteTxEntry entry = it.next();
+
+ int[] invalidParts = invalidPartsMap.get(entry.cacheId());
+
+ if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) {
+ it.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
+ ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
+ }
+ }
if (dhtMapping.empty()) {
dhtMap.remove(nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 753c117..bcf7f8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -55,6 +55,10 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
@GridDirectCollection(int.class)
private Collection<Integer> invalidParts;
+ /** Invalid partitions by cache ID. */
+ @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+ private Map<Integer, int[]> invalidPartsByCacheId;
+
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
private List<GridCacheEntryInfo> preloadEntries;
@@ -140,6 +144,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @return Map from cacheId to an array of invalid partitions.
+ */
+ public Map<Integer, int[]> invalidPartitionsByCacheId() {
+ return invalidPartsByCacheId;
+ }
+
+ /**
+ * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
+ */
+ public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
+ this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+ }
+
+ /**
* Gets preload entries found on backup node.
*
* @return Collection of entry infos need to be preloaded.
@@ -238,18 +256,24 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
case 11:
- if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 12:
+ if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -288,7 +312,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 10:
- miniId = reader.readIgniteUuid("miniId");
+ invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
@@ -296,7 +320,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 11:
- nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -304,6 +328,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 12:
+ nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -323,6 +355,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index bb3673d..6e39672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -61,7 +61,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
DFLT_MAX_REMAP_CNT);
/** Context. */
- private GridCacheContext<K, V> cctx;
+ private final GridCacheContext<K, V> cctx;
/** Keys. */
private Collection<KeyCacheObject> keys;
@@ -105,6 +105,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
/** Skip values flag. */
private boolean skipVals;
+ /** Flag indicating whether future can be remapped on a newer topology version. */
+ private final boolean canRemap;
+
/**
* @param cctx Context.
* @param keys Keys.
@@ -130,7 +133,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -147,6 +151,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
this.taskName = taskName;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
+ this.canRemap = canRemap;
futId = IgniteUuid.randomUuid();
@@ -160,7 +165,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+ canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
@@ -334,7 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
remapKeys.add(key);
}
- AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+ AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
"not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -461,7 +467,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
}
}
- ClusterNode node = cctx.affinity().primary(key, topVer);
+ ClusterNode node = affinityNode(key, topVer);
if (node == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -522,6 +528,28 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
}
/**
+ * Finds affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node from which the key will be requested.
+ */
+ private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : nodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
+
+ /**
* @param infos Entry infos.
* @return Result map.
*/
@@ -557,14 +585,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node ID. */
- private ClusterNode node;
+ private final ClusterNode node;
/** Keys. */
@GridToStringInclude
- private LinkedHashMap<KeyCacheObject, Boolean> keys;
+ private final LinkedHashMap<KeyCacheObject, Boolean> keys;
/** Topology version on which this future was mapped. */
- private AffinityTopologyVersion topVer;
+ private final AffinityTopologyVersion topVer;
/** {@code True} if remapped after node left. */
private boolean remapped;
@@ -625,37 +653,45 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
- final AffinityTopologyVersion updTopVer =
- new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+ // Try getting from existing nodes.
+ if (!canRemap) {
+ map(keys.keySet(), F.t(node, keys), topVer);
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
- cctx.affinity().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
-
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
-
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridPartitionedGetFuture.this.onDone(e);
+ onDone(Collections.<K, V>emptyMap());
+ }
+ else {
+ final AffinityTopologyVersion updTopVer =
+ new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+ final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+ cctx.kernalContext().config().getNetworkTimeout(),
+ updTopVer,
+ e);
+
+ cctx.affinity().affinityReadyFuture(updTopVer).listen(
+ new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ if (timeout.finish()) {
+ cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+
+ try {
+ fut.get();
+
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridPartitionedGetFuture.this.onDone(e);
+ }
}
}
}
- }
- );
+ );
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bee34d9..c44b028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -248,7 +248,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ final boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -278,7 +279,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
deserializePortable,
expiryPlc,
skipVals,
- skipStore);
+ skipStore,
+ canRemap);
}
});
}
@@ -870,8 +872,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
- boolean skipStore) {
- AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+ boolean skipStore,
+ boolean canRemap
+ ) {
+ AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+ ctx.shared().exchange().readyAffinityVersion();
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
@@ -971,7 +976,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskName,
deserializePortable,
expiry,
- skipVals);
+ skipVals,
+ canRemap);
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e527f08..07ec808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -231,6 +231,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
nearEnabled = CU.isNearEnabled(cctx);
+ if (!waitTopFut)
+ remapCnt = 1;
+
this.remapCnt = new AtomicInteger(remapCnt);
}