You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/10 12:56:20 UTC
[33/50] [abbrv] ignite git commit: IGNITE-2329: Implemented a bunch
of optimizations: - Garbageless NIO Selector - Get rid of unnecessary
ArrayList allocations in GridCacheMvccManager. - Optimized "force keys"
futures logic.
IGNITE-2329: Implemented a bunch of optimizations:
- Garbageless NIO Selector
- Get rid of unnecessary ArrayList allocations in GridCacheMvccManager.
- Optimized "force keys" futures logic.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/75961eee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/75961eee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/75961eee
Branch: refs/heads/ignite-1786
Commit: 75961eee2513427d94a1c7e0dbb96ac46195544b
Parents: 4210989
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Feb 5 21:13:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Feb 5 21:13:26 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 12 +-
.../processors/cache/GridCacheAdapter.java | 37 +-
.../processors/cache/GridCacheMvccManager.java | 42 +-
.../processors/cache/GridCachePreloader.java | 6 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../processors/cache/GridCacheUtils.java | 21 +-
.../dht/GridClientPartitionTopology.java | 5 +
.../distributed/dht/GridDhtCacheAdapter.java | 72 ++-
.../distributed/dht/GridDhtEmbeddedFuture.java | 13 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 176 ++++---
.../distributed/dht/GridDhtGetSingleFuture.java | 476 +++++++++++++++++++
.../distributed/dht/GridDhtLocalPartition.java | 76 +--
.../distributed/dht/GridDhtPartitionState.java | 2 +-
.../dht/GridDhtPartitionTopology.java | 5 +
.../dht/GridDhtPartitionTopologyImpl.java | 9 +
.../distributed/dht/GridDhtTxPrepareFuture.java | 7 +-
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../dht/colocated/GridDhtColocatedCache.java | 40 +-
.../dht/preloader/GridDhtPreloader.java | 16 +
.../cache/distributed/near/GridNearTxLocal.java | 1 -
.../IgniteCacheObjectProcessorImpl.java | 2 +-
.../util/future/GridCompoundFuture.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 143 +++++-
.../util/nio/GridSelectorNioSessionImpl.java | 2 +-
.../util/nio/SelectedSelectionKeySet.java | 111 +++++
.../org/apache/ignite/lang/IgniteBiTuple.java | 6 +-
.../IgniteTxPreloadAbstractTest.java | 2 +-
.../near/GridCacheNearReadersSelfTest.java | 19 +-
.../apache/ignite/lang/GridTupleSelfTest.java | 42 +-
parent/pom.xml | 1 +
30 files changed, 1119 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index de7c10b..6f07702 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -370,11 +370,21 @@ public final class IgniteSystemProperties {
/**
* Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for
* {@link Serializable} classes.
- * */
+ */
public static final String IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID =
"IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID";
/**
+ * If set to {@code true}, then default selected keys set is used inside
+ * {@code GridNioServer} which lead to some extra garbage generation when
+ * processing selected keys.
+ * <p>
+ * Default value is {@code false}. Should be switched to {@code true} if there are
+ * any problems in communication layer.
+ */
+ public static final String IGNITE_NO_SELECTOR_OPTS = "IGNITE_NO_SELECTOR_OPTS";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9f54ddb..84eb0b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -101,7 +101,6 @@ import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1823,7 +1822,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param needVer If {@code true} returns values as tuples containing value and version.
* @return Future.
*/
- public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
+ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
+ @Nullable final Collection<KeyCacheObject> keys,
final boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1834,7 +1834,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean keepCacheObjects,
boolean canRemap,
final boolean needVer
- ) {
+ ) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1853,11 +1853,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (tx == null || tx.implicit()) {
try {
- final AffinityTopologyVersion topVer = tx == null
- ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
- : tx.topologyVersion();
+ final AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ?
+ ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
+
+ int keysSize = keys.size();
- final Map<K1, V1> map = new GridLeanMap<>(keys.size());
+ final Map<K1, V1> map = keysSize == 1 ?
+ (Map<K1, V1>)new IgniteBiTuple<>() :
+ U.<K1, V1>newHashMap(keysSize);
final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
@@ -1893,7 +1898,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheVersion ver = entry.version();
if (misses == null)
- misses = new GridLeanMap<>();
+ misses = new HashMap<>();
misses.put(key, ver);
}
@@ -1913,7 +1918,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
- if (keys.size() == 1)
+ if (keysSize == 1)
// Safe to return because no locks are required in READ_COMMITTED mode.
return new GridFinishedFuture<>(map);
}
@@ -2051,17 +2056,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
);
}
- else {
- // If misses is not empty and store is disabled, we should touch missed entries.
- if (misses != null) {
- for (KeyCacheObject key : misses.keySet()) {
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
- }
- }
- }
+ else
+ // Misses can be non-zero only if store is enabled.
+ assert misses == null;
return new GridFinishedFuture<>(map);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c7d1f62..b2c23f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,6 +17,18 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
@@ -52,18 +64,6 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -77,12 +77,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private static final int MAX_REMOVED_LOCKS = 10240;
/** Pending locks per thread. */
- private final ThreadLocal<LinkedList<GridCacheMvccCandidate>> pending =
- new ThreadLocal<LinkedList<GridCacheMvccCandidate>>() {
- @Override protected LinkedList<GridCacheMvccCandidate> initialValue() {
- return new LinkedList<>();
- }
- };
+ private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal<>();
/** Pending near local locks and topology version per thread. */
private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
@@ -683,7 +678,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Remote candidates.
*/
public Collection<GridCacheMvccCandidate> remoteCandidates() {
- Collection<GridCacheMvccCandidate> rmtCands = new LinkedList<>();
+ Collection<GridCacheMvccCandidate> rmtCands = new ArrayList<>();
for (GridDistributedCacheEntry entry : locked())
rmtCands.addAll(entry.remoteMvccSnapshot());
@@ -697,7 +692,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Local candidates.
*/
public Collection<GridCacheMvccCandidate> localCandidates() {
- Collection<GridCacheMvccCandidate> locCands = new LinkedList<>();
+ Collection<GridCacheMvccCandidate> locCands = new ArrayList<>();
for (GridDistributedCacheEntry entry : locked()) {
try {
@@ -726,7 +721,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (cacheCtx.isNear() || cand.singleImplicit())
return true;
- LinkedList<GridCacheMvccCandidate> queue = pending.get();
+ Deque<GridCacheMvccCandidate> queue = pending.get();
+
+ if (queue == null)
+ pending.set(queue = new ArrayDeque<>());
GridCacheMvccCandidate prev = null;
@@ -751,7 +749,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* Reset MVCC context.
*/
public void contextReset() {
- pending.set(new LinkedList<GridCacheMvccCandidate>());
+ pending.set(null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c8fcb90..be019fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -137,6 +137,12 @@ public interface GridCachePreloader {
public IgniteInternalFuture<Boolean> rebalanceFuture();
/**
+ * @return {@code true} if there is no need to force keys preloading
+ * (e.g. rebalancing has been completed).
+ */
+ public boolean needForceKeys();
+
+ /**
* Requests that preloader sends the request for the key.
*
* @param keys Keys to request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index a1704fc..5d98c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -93,6 +93,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public boolean needForceKeys() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void onReconnected() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 8723827..cd21794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -751,23 +751,28 @@ public class GridCacheUtils {
* @param <T> Collection element type.
* @return Reducer.
*/
- public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer() {
+ public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer(final int size) {
return new IgniteReducer<Collection<T>, Collection<T>>() {
- private final Collection<T> ret = new ConcurrentLinkedQueue<>();
+ private List<T> ret;
+
+ @Override public synchronized boolean collect(Collection<T> c) {
+ if (c == null)
+ return true;
+
+ if (ret == null)
+ ret = new ArrayList<>(size);
- @Override public boolean collect(Collection<T> c) {
- if (c != null)
- ret.addAll(c);
+ ret.addAll(c);
return true;
}
- @Override public Collection<T> reduce() {
- return ret;
+ @Override public synchronized Collection<T> reduce() {
+ return ret == null ? Collections.<T>emptyList() : ret;
}
/** {@inheritDoc} */
- @Override public String toString() {
+ @Override public synchronized String toString() {
return "Collection reducer: " + ret;
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index dcfc038..ad4943e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -336,6 +336,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public void releasePartitions(int... parts) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public List<GridDhtLocalPartition> localPartitions() {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5be4e72..8e456e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -698,7 +698,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Nullable UUID subjId,
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean skipVals) {
+ boolean skipVals
+ ) {
GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
msgId,
reader,
@@ -718,21 +719,63 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* @param nodeId Node ID.
+ * @param msgId Message ID.
+ * @param key Key.
+ * @param addRdr Add reader flag.
+ * @param readThrough Read through flag.
+ * @param topVer Topology version flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
+ * @param expiry Expiry.
+ * @param skipVals Skip vals flag.
+ * @return Future for the operation.
+ */
+ private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+ UUID nodeId,
+ long msgId,
+ KeyCacheObject key,
+ boolean addRdr,
+ boolean readThrough,
+ AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean skipVals
+ ) {
+ GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+ ctx,
+ msgId,
+ nodeId,
+ key,
+ addRdr,
+ readThrough,
+ /*tx*/null,
+ topVer,
+ subjId,
+ taskNameHash,
+ expiry,
+ skipVals);
+
+ fut.init();
+
+ return fut;
+ }
+
+ /**
+ * @param nodeId Node ID.
* @param req Get request.
*/
protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
assert ctx.affinityNode();
- long ttl = req.accessTtl();
-
- final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
-
- Map<KeyCacheObject, Boolean> map = Collections.singletonMap(req.key(), req.addReader());
+ final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl());
- IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
- getDhtAsync(nodeId,
+ IgniteInternalFuture<GridCacheEntryInfo> fut =
+ getDhtSingleAsync(
+ nodeId,
req.messageId(),
- map,
+ req.key(),
+ req.addReader(),
req.readThrough(),
req.topologyVersion(),
req.subjectId(),
@@ -740,19 +783,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
expiryPlc,
req.skipValues());
- fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
- @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+ fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
+ @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
GridNearSingleGetResponse res;
- GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
- (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+ GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f;
try {
- Collection<GridCacheEntryInfo> entries = fut.get();
+ GridCacheEntryInfo info = fut.get();
if (F.isEmpty(fut.invalidPartitions())) {
- GridCacheEntryInfo info = F.first(entries);
-
Message res0 = null;
if (info != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
index 0d10a93..1b9f743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -32,10 +31,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
/** */
private static final long serialVersionUID = 0L;
- /** Retries. */
- @GridToStringInclude
- private Collection<Integer> invalidParts;
-
/**
* @param c Closure.
* @param embedded Embedded.
@@ -45,8 +40,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
IgniteInternalFuture<B> embedded
) {
super(c, embedded);
-
- invalidParts = Collections.emptyList();
}
/**
@@ -58,17 +51,15 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c
) {
super(embedded, c);
-
- invalidParts = Collections.emptyList();
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
- return invalidParts;
+ return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtEmbeddedFuture.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index c926c13..fa753b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -82,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private Map<KeyCacheObject, Boolean> keys;
/** Reserved partitions. */
- private Collection<GridDhtLocalPartition> parts = new HashSet<>();
+ private int[] parts;
/** Future ID. */
private IgniteUuid futId;
@@ -137,7 +138,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals
) {
- super(CU.<GridCacheEntryInfo>collectionsReducer());
+ super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size()));
assert reader != null;
assert !F.isEmpty(keys);
@@ -194,8 +195,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
@Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) {
if (super.onDone(res, err)) {
// Release all partitions reserved by this future.
- for (GridDhtLocalPartition part : parts)
- part.release();
+ if (parts != null)
+ cctx.topology().releasePartitions(parts);
return true;
}
@@ -209,68 +210,92 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private void map(final Map<KeyCacheObject, Boolean> keys) {
GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
- if (!F.isEmpty(fut.invalidPartitions())) {
- if (retries == null)
- retries = new HashSet<>();
+ if (fut != null) {
+ if (!F.isEmpty(fut.invalidPartitions())) {
+ if (retries == null)
+ retries = new HashSet<>();
- retries.addAll(fut.invalidPartitions());
- }
+ retries.addAll(fut.invalidPartitions());
+ }
- add(new GridEmbeddedFuture<>(
- new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
- @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
- if (e != null) { // Check error first.
- if (log.isDebugEnabled())
- log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
+ add(new GridEmbeddedFuture<>(
+ new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
+ @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
+ if (e != null) { // Check error first.
+ if (log.isDebugEnabled())
+ log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
- onDone(e);
+ onDone(e);
+ }
+ else
+ map0(keys);
+
+ // Finish this one.
+ return Collections.emptyList();
}
+ },
+ fut));
+ }
+ else
+ map0(keys);
+ }
- Map<KeyCacheObject, Boolean> mappedKeys = null;
+ /**
+ * @param keys Keys to map.
+ */
+ private void map0(Map<KeyCacheObject, Boolean> keys) {
+ Map<KeyCacheObject, Boolean> mappedKeys = null;
- // Assign keys to primary nodes.
- for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
- int part = cctx.affinity().partition(key.getKey());
+ // Assign keys to primary nodes.
+ for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
+ int part = cctx.affinity().partition(key.getKey());
- if (retries == null || !retries.contains(part)) {
- if (!map(key.getKey(), parts)) {
- if (retries == null)
- retries = new HashSet<>();
+ if (retries == null || !retries.contains(part)) {
+ if (!map(key.getKey())) {
+ if (retries == null)
+ retries = new HashSet<>();
- retries.add(part);
+ retries.add(part);
- if (mappedKeys == null) {
- mappedKeys = U.newLinkedHashMap(keys.size());
+ if (mappedKeys == null) {
+ mappedKeys = U.newLinkedHashMap(keys.size());
- for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) {
- if (key1.getKey() == key.getKey())
- break;
+ for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) {
+ if (key1.getKey() == key.getKey())
+ break;
- mappedKeys.put(key.getKey(), key1.getValue());
- }
- }
- }
- else if (mappedKeys != null)
- mappedKeys.put(key.getKey(), key.getValue());
+ mappedKeys.put(key.getKey(), key1.getValue());
}
}
+ }
+ else if (mappedKeys != null)
+ mappedKeys.put(key.getKey(), key.getValue());
+ }
+ }
- // Add new future.
- add(getAsync(mappedKeys == null ? keys : mappedKeys));
+ // Add new future.
+ IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getAsync(mappedKeys == null ? keys : mappedKeys);
- // Finish this one.
- return Collections.emptyList();
- }
- },
- fut));
+ // Optimization to avoid going through compound future,
+ // if getAsync() has been completed and no other futures added to this
+ // compound future.
+ if (fut.isDone() && futuresSize() == 0) {
+ if (fut.error() != null)
+ onDone(fut.error());
+ else
+ onDone(fut.result());
+
+ return;
+ }
+
+ add(fut);
}
/**
* @param key Key.
- * @param parts Parts to map.
* @return {@code True} if mapped.
*/
- private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) {
+ private boolean map(KeyCacheObject key) {
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
cache().topology().localPartition(key, false);
@@ -278,10 +303,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (part == null)
return false;
- if (!parts.contains(part)) {
+ if (parts == null || !F.contains(parts, part.id())) {
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
- parts.add(part);
+ parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+
+ parts[parts.length - 1] = part.id();
return true;
}
@@ -422,37 +449,56 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
);
}
+ if (fut.isDone()) {
+ if (fut.error() != null)
+ onDone(fut.error());
+ else
+ return new GridFinishedFuture<>(toEntryInfos(fut.result()));
+ }
+
return new GridEmbeddedFuture<>(
new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
- @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
+ @Override public Collection<GridCacheEntryInfo> apply(
+ Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e
+ ) {
if (e != null) {
onDone(e);
return Collections.emptyList();
}
- else {
- Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
+ else
+ return toEntryInfos(map);
+ }
+ },
+ fut);
+ }
- for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
- T2<CacheObject, GridCacheVersion> val = entry.getValue();
+ /**
+ * @param map Map to convert.
+ * @return List of infos.
+ */
+ private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) {
+ if (map.isEmpty())
+ return Collections.emptyList();
- assert val != null;
+ Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
- GridCacheEntryInfo info = new GridCacheEntryInfo();
+ for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+ T2<CacheObject, GridCacheVersion> val = entry.getValue();
- info.cacheId(cctx.cacheId());
- info.key(entry.getKey());
- info.value(skipVals ? null : val.get1());
- info.version(val.get2());
+ assert val != null;
- infos.add(info);
- }
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
- return infos;
- }
- }
- },
- fut);
+ info.cacheId(cctx.cacheId());
+ info.key(entry.getKey());
+ info.value(skipVals ? null : val.get1());
+ info.version(val.get2());
+
+ infos.add(info);
+ }
+
+ return infos;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
new file mode 100644
index 0000000..d9851c7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo>
+ implements GridDhtFuture<GridCacheEntryInfo> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Message ID. */
+ private long msgId;
+
+ /** */
+ private UUID reader;
+
+ /** Read through flag. */
+ private boolean readThrough;
+
+ /** Context. */
+ private GridCacheContext<K, V> cctx;
+
+ /** Key. */
+ private KeyCacheObject key;
+
+ /** */
+ private boolean addRdr;
+
+ /** Reserved partitions. */
+ private int part = -1;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Version. */
+ private GridCacheVersion ver;
+
+ /** Topology version .*/
+ private AffinityTopologyVersion topVer;
+
+ /** Transaction. */
+ private IgniteTxLocalEx tx;
+
+ /** Retries because ownership changed. */
+ private Collection<Integer> retries;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name. */
+ private int taskNameHash;
+
+ /** Expiry policy. */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** Skip values flag. */
+ private boolean skipVals;
+
+ /**
+ * @param cctx Context.
+ * @param msgId Message ID.
+ * @param reader Reader.
+ * @param key Key.
+ * @param addRdr Add reader flag.
+ * @param readThrough Read through flag.
+ * @param tx Transaction.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ */
+ public GridDhtGetSingleFuture(
+ GridCacheContext<K, V> cctx,
+ long msgId,
+ UUID reader,
+ KeyCacheObject key,
+ Boolean addRdr,
+ boolean readThrough,
+ @Nullable IgniteTxLocalEx tx,
+ @NotNull AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals
+ ) {
+ assert reader != null;
+ assert key != null;
+
+ this.reader = reader;
+ this.cctx = cctx;
+ this.msgId = msgId;
+ this.key = key;
+ this.addRdr = addRdr;
+ this.readThrough = readThrough;
+ this.tx = tx;
+ this.topVer = topVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.expiryPlc = expiryPlc;
+ this.skipVals = skipVals;
+
+ futId = IgniteUuid.randomUuid();
+
+ ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
+ }
+
+ /**
+ * Initializes future.
+ */
+ void init() {
+ map();
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion version() {
+ return ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) {
+ if (super.onDone(res, err)) {
+ // Release all partitions reserved by this future.
+ if (part != -1)
+ cctx.topology().releasePartitions(part);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private void map() {
+ if (cctx.dht().dhtPreloader().needForceKeys()) {
+ GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+ Collections.singleton(key),
+ topVer);
+
+ if (fut != null) {
+ if (F.isEmpty(fut.invalidPartitions())) {
+ if (retries == null)
+ retries = new HashSet<>();
+
+ retries.addAll(fut.invalidPartitions());
+ }
+
+ fut.listen(
+ new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> fut) {
+ Throwable e = fut.error();
+
+ if (e != null) { // Check error first.
+ if (log.isDebugEnabled())
+ log.debug("Failed to request keys from preloader " +
+ "[keys=" + key + ", err=" + e + ']');
+
+ onDone(e);
+ }
+ else
+ map0();
+ }
+ }
+ );
+
+ return;
+ }
+ }
+
+ map0();
+ }
+
+ /**
+ *
+ */
+ private void map0() {
+ // Assign keys to primary nodes.
+ int part = cctx.affinity().partition(key);
+
+ if (retries == null || !retries.contains(part)) {
+ if (!map(key)) {
+ retries = Collections.singleton(part);
+
+ onDone((GridCacheEntryInfo)null);
+
+ return;
+ }
+ }
+
+ getAsync();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> invalidPartitions() {
+ return retries == null ? Collections.<Integer>emptyList() : retries;
+ }
+
+ /**
+ * @param key Key.
+ * @return {@code True} if mapped.
+ */
+ private boolean map(KeyCacheObject key) {
+ GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
+ cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
+ cache().topology().localPartition(key, false);
+
+ if (part == null)
+ return false;
+
+ assert this.part == -1;
+
+ // By reserving, we make sure that partition won't be unloaded while processed.
+ if (part.reserve()) {
+ this.part = part.id();
+
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings( {"unchecked", "IfMayBeConditional"})
+ private void getAsync() {
+ assert part != -1;
+
+ String taskName0 = cctx.kernalContext().job().currentTaskName();
+
+ if (taskName0 == null)
+ taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
+
+ final String taskName = taskName0;
+
+ IgniteInternalFuture<Boolean> rdrFut = null;
+
+ ClusterNode readerNode = cctx.discovery().node(reader);
+
+ if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+ while (true) {
+ GridDhtCacheEntry e = cache().entryExx(key, topVer);
+
+ try {
+ if (e.obsolete())
+ continue;
+
+ boolean addReader = (!e.deleted() && addRdr && !skipVals);
+
+ if (addReader)
+ e.unswap(false);
+
+ // Register reader. If there are active transactions for this entry,
+ // then will wait for their completion before proceeding.
+ // TODO: GG-4003:
+ // TODO: What if any transaction we wait for actually removes this entry?
+ // TODO: In this case seems like we will be stuck with untracked near entry.
+ // TODO: To fix, check that reader is contained in the list of readers once
+ // TODO: again after the returned future completes - if not, try again.
+ rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null;
+
+ break;
+ }
+ catch (IgniteCheckedException err) {
+ onDone(err);
+
+ return;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry when getting a DHT value: " + e);
+ }
+ finally {
+ cctx.evicts().touch(e, topVer);
+ }
+ }
+ }
+
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+
+ if (rdrFut == null || rdrFut.isDone()) {
+ if (tx == null) {
+ fut = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
+ }
+ else {
+ fut = tx.getAllAsync(cctx,
+ Collections.singleton(key),
+ /*deserialize binary*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough,
+ false);
+ }
+ }
+ else {
+ rdrFut.listen(
+ new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ Throwable e = fut.error();
+
+ if (e != null) {
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0;
+
+ if (tx == null) {
+ fut0 = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
+ }
+ else {
+ fut0 = tx.getAllAsync(cctx,
+ Collections.singleton(key),
+ /*deserialize binary*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough,
+ false
+ );
+ }
+
+ fut0.listen(createGetFutureListener());
+ }
+ }
+ );
+
+ return;
+ }
+
+ if (fut.isDone())
+ onResult(fut);
+ else
+ fut.listen(createGetFutureListener());
+ }
+
+ /**
+ * @return Listener for get future.
+ */
+ @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>
+ createGetFutureListener() {
+ return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut
+ ) {
+ onResult(fut);
+ }
+ };
+ }
+
+ /**
+ * @param fut Completed future to finish this process with.
+ */
+ private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut) {
+ assert fut.isDone();
+
+ if (fut.error() != null)
+ onDone(fut.error());
+ else {
+ try {
+ onDone(toEntryInfo(fut.get()));
+ }
+ catch (IgniteCheckedException e) {
+ assert false; // Should never happen.
+ }
+ }
+ }
+
+ /**
+ * @param map Map to convert.
+ * @return List of infos.
+ */
+ private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) {
+ if (map.isEmpty())
+ return null;
+
+ T2<CacheObject, GridCacheVersion> val = map.get(key);
+
+ assert val != null;
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.cacheId(cctx.cacheId());
+ info.key(key);
+ info.value(skipVals ? null : val.get1());
+ info.version(val.get2());
+
+ return info;
+ }
+
+ /**
+ * @return DHT cache.
+ */
+ private GridDhtCacheAdapter<K, V> cache() {
+ return (GridDhtCacheAdapter<K, V>)cctx.cache();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c4312b5..4fc1eaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -83,8 +82,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/** State. */
@GridToStringExclude
- private final AtomicStampedReference<GridDhtPartitionState> state =
- new AtomicStampedReference<>(MOVING, 0);
+ private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32);
/** Rent future. */
@GridToStringExclude
@@ -153,8 +151,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @return {@code false} If such reservation already added.
*/
public boolean addReservation(GridDhtPartitionsReservation r) {
- assert state.getReference() != EVICTED : "we can reserve only active partitions";
- assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
+ assert GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32)) != EVICTED :
+ "we can reserve only active partitions";
+ assert (state.get() & 0xFFFF) != 0 : "partition must be already reserved before adding group reservation";
return reservations.addIfAbsent(r);
}
@@ -185,14 +184,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @return Partition state.
*/
public GridDhtPartitionState state() {
- return state.getReference();
+ return GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32));
}
/**
* @return Reservations.
*/
public int reservations() {
- return state.getStamp();
+ return (int)(state.get() & 0xFFFF);
}
/**
@@ -385,14 +384,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*/
@Override public boolean reserve() {
while (true) {
- int reservations = state.getStamp();
+ long reservations = state.get();
- GridDhtPartitionState s = state.getReference();
-
- if (s == EVICTED)
+ if ((int)(reservations >> 32) == EVICTED.ordinal())
return false;
- if (state.compareAndSet(s, s, reservations, reservations + 1))
+ if (state.compareAndSet(reservations, reservations + 1))
return true;
}
}
@@ -402,17 +399,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*/
@Override public void release() {
while (true) {
- int reservations = state.getStamp();
+ long reservations = state.get();
- if (reservations == 0)
+ if ((int)(reservations & 0xFFFF) == 0)
return;
- GridDhtPartitionState s = state.getReference();
-
- assert s != EVICTED;
+ assert (int)(reservations >> 32) != EVICTED.ordinal();
// Decrement reservations.
- if (state.compareAndSet(s, s, reservations, --reservations)) {
+ if (state.compareAndSet(reservations, --reservations)) {
tryEvict();
break;
@@ -421,23 +416,32 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
+ * @param reservations Current aggregated value.
+ * @param toState State to switch to.
+ * @return {@code true} if cas succeeds.
+ */
+ private boolean casState(long reservations, GridDhtPartitionState toState) {
+ return state.compareAndSet(reservations, (reservations & 0xFFFF) | ((long)toState.ordinal() << 32));
+ }
+
+ /**
* @return {@code True} if transitioned to OWNING state.
*/
boolean own() {
while (true) {
- int reservations = state.getStamp();
+ long reservations = state.get();
- GridDhtPartitionState s = state.getReference();
+ int ord = (int)(reservations >> 32);
- if (s == RENTING || s == EVICTED)
+ if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
return false;
- if (s == OWNING)
+ if (ord == OWNING.ordinal())
return true;
- assert s == MOVING;
+ assert ord == MOVING.ordinal();
- if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) {
+ if (casState(reservations, OWNING)) {
if (log.isDebugEnabled())
log.debug("Owned partition: " + this);
@@ -455,14 +459,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*/
IgniteInternalFuture<?> rent(boolean updateSeq) {
while (true) {
- int reservations = state.getStamp();
+ long reservations = state.get();
- GridDhtPartitionState s = state.getReference();
+ int ord = (int)(reservations >> 32);
- if (s == RENTING || s == EVICTED)
+ if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
return rent;
- if (state.compareAndSet(s, RENTING, reservations, reservations)) {
+ if (casState(reservations, RENTING)) {
if (log.isDebugEnabled())
log.debug("Moved partition to RENTING state: " + this);
@@ -481,9 +485,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @param updateSeq Update sequence.
*/
void tryEvictAsync(boolean updateSeq) {
+ long reservations = state.get();
+
+ int ord = (int)(reservations >> 32);
+
if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
- state.getReference() == RENTING && state.getStamp() == 0 &&
- state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+ ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 &&
+ casState(reservations, EVICTED)) {
if (log.isDebugEnabled())
log.debug("Evicted partition: " + this);
@@ -520,13 +528,17 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*
*/
public void tryEvict() {
- if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
+ long reservations = state.get();
+
+ int ord = (int)(reservations >> 32);
+
+ if (ord != RENTING.ordinal() || (reservations & 0xFFFF) != 0 || groupReserved())
return;
// Attempt to evict partition entries from cache.
clearAll();
- if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+ if (map.isEmpty() && casState(reservations, EVICTED)) {
if (log.isDebugEnabled())
log.debug("Evicted partition: " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
index 7b49369..041f135 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
@@ -52,4 +52,4 @@ public enum GridDhtPartitionState {
public boolean active() {
return this != EVICTED;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index dd06d6f..84889f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -112,6 +112,11 @@ public interface GridDhtPartitionTopology {
throws GridDhtInvalidPartitionException;
/**
+ * @param parts Partitions to release (should be reserved before).
+ */
+ public void releasePartitions(int... parts);
+
+ /**
* @param key Cache key.
* @param create If {@code true}, then partition will be created if it's not there.
* @return Local partition.
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d6fc8f1..0e579ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -612,6 +612,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public void releasePartitions(int... parts) {
+ assert parts != null;
+ assert parts.length > 0;
+
+ for (int i = 0; i < parts.length; i++)
+ locParts.get(parts[i]).release();
+ }
+
+ /** {@inheritDoc} */
@Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 41b28d5..4c783f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -988,7 +988,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
- if (compFut != null)
+ if (compFut != null && lastForceFut != null)
compFut.add(lastForceFut);
}
@@ -997,11 +997,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return compFut;
}
- else {
- assert lastForceFut != null;
-
+ else
return lastForceFut;
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f6f57ee..6c7bac5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1309,7 +1309,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
- if (forceFut.isDone())
+ if (forceFut == null || forceFut.isDone())
updateAllAsyncInternal0(nodeId, req, completionCb);
else {
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index dc4b6bd..1a2eb22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -897,28 +897,24 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
// Prevent embedded future creation if possible.
- if (keyFut.isDone()) {
- try {
- // Check for exception.
- keyFut.get();
-
- return lockAllAsync0(cacheCtx,
- tx,
- threadId,
- ver,
- topVer,
- keys,
- txRead,
- retval,
- timeout,
- accessTtl,
- filter,
- skipStore,
- keepBinary);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
+ if (keyFut == null || keyFut.isDone()) {
+ // Check for exception.
+ if (keyFut != null && keyFut.error() != null)
+ return new GridFinishedFuture<>(keyFut.error());
+
+ return lockAllAsync0(cacheCtx,
+ tx,
+ threadId,
+ ver,
+ topVer,
+ keys,
+ txRead,
+ retval,
+ timeout,
+ accessTtl,
+ filter,
+ skipStore,
+ keepBinary);
}
else {
return new GridEmbeddedFuture<>(keyFut,
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index f0054e4..6ec02a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -403,6 +403,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
try {
demandLock.readLock().lock();
+
try {
demander.handleSupplyMessage(idx, id, s);
}
@@ -692,12 +693,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override public boolean needForceKeys() {
+ if (cctx.rebalanceEnabled()) {
+ IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
+
+ if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
+ return false;
+ }
+
+ return true;
+ }
+
/**
* @param keys Keys to request.
* @return Future for request.
*/
@SuppressWarnings( {"unchecked", "RedundantCast"})
@Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ if (!needForceKeys())
+ return null;
+
final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b7b480e..0853b77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 54dd69e..2e825b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -302,7 +302,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
ClassLoader ldr = ctx.p2pEnabled() ?
IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
- Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+ Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
return new KeyCacheObjectImpl(val, valBytes);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index c382497..3409341 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -258,7 +258,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
/**
* @return Futures size.
*/
- private int futuresSize() {
+ protected int futuresSize() {
synchronized (futs) {
return futs.size();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c7679c0..75fa9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -43,10 +44,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -102,6 +103,24 @@ public class GridNioServer<T> {
/** SSL write buf limit. */
private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
+ /** */
+ private static final boolean DISABLE_KEYSET_OPTIMIZATION =
+ IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
+
+ /**
+ *
+ */
+ static {
+ // This is a workaround for JDK bug (NPE in Selector.open()).
+ // http://bugs.sun.com/view_bug.do?bug_id=6427854
+ try {
+ Selector.open().close();
+ }
+ catch (IOException ignored) {
+ // No-op.
+ }
+ }
+
/** Accept worker thread. */
@GridToStringExclude
private final IgniteThread acceptThread;
@@ -184,17 +203,6 @@ public class GridNioServer<T> {
/** Optional listener to monitor outbound message queue size. */
private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
- /** Static initializer ensures single-threaded execution of workaround. */
- static {
- // This is a workaround for JDK bug (NPE in Selector.open()).
- // http://bugs.sun.com/view_bug.do?bug_id=6427854
- try {
- Selector.open().close();
- }
- catch (IOException ignored) {
- }
- }
-
/**
* @param addr Address.
* @param port Port.
@@ -445,10 +453,8 @@ public class GridNioServer<T> {
// Change from 0 to 1 means that worker thread should be waken up.
clientWorkers.get(ses.selectorIndex()).offer(fut);
- IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
-
- if (lsnr0 != null)
- lsnr0.apply(ses, msgCnt);
+ if (msgQueueLsnr != null)
+ msgQueueLsnr.apply(ses, msgCnt);
}
/**
@@ -1239,6 +1245,9 @@ public class GridNioServer<T> {
/** Selector to select read events. */
private Selector selector;
+ /** Selected keys. */
+ private SelectedSelectionKeySet selectedKeys;
+
/** Worker index. */
private final int idx;
@@ -1253,7 +1262,7 @@ public class GridNioServer<T> {
throws IgniteCheckedException {
super(gridName, name, log);
- selector = createSelector(null);
+ createSelector();
this.idx = idx;
}
@@ -1262,10 +1271,11 @@ public class GridNioServer<T> {
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
boolean reset = false;
+
while (!closed) {
try {
if (reset)
- selector = createSelector(null);
+ createSelector();
bodyInternal();
}
@@ -1290,6 +1300,50 @@ public class GridNioServer<T> {
}
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void createSelector() throws IgniteCheckedException {
+ selectedKeys = null;
+
+ selector = GridNioServer.this.createSelector(null);
+
+ if (DISABLE_KEYSET_OPTIMIZATION)
+ return;
+
+ try {
+ SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
+
+ Class<?> selectorImplClass =
+ Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
+
+ // Ensure the current selector implementation is what we can instrument.
+ if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+ return;
+
+ Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
+ Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+
+ selectedKeysField.setAccessible(true);
+ publicSelectedKeysField.setAccessible(true);
+
+ selectedKeysField.set(selector, selectedKeySet);
+ publicSelectedKeysField.set(selector, selectedKeySet);
+
+ selectedKeys = selectedKeySet;
+
+ if (log.isDebugEnabled())
+ log.debug("Instrumented an optimized java.util.Set into: " + selector);
+ }
+ catch (Exception e) {
+ selectedKeys = null;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to instrument an optimized java.util.Set into selector [selector=" + selector
+ + ", err=" + e + ']');
+ }
+ }
+
+ /**
* Adds socket channel to the registration queue and wakes up reading thread.
*
* @param req Change request.
@@ -1385,7 +1439,10 @@ public class GridNioServer<T> {
// Wake up every 2 seconds to check if closed.
if (selector.select(2000) > 0) {
// Walk through the ready keys collection and process network events.
- processSelectedKeys(selector.selectedKeys());
+ if (selectedKeys == null)
+ processSelectedKeys(selector.selectedKeys());
+ else
+ processSelectedKeysOptimized(selectedKeys.flip());
}
long now = U.currentTimeMillis();
@@ -1431,10 +1488,58 @@ public class GridNioServer<T> {
* @param keys Selected keys.
* @throws ClosedByInterruptException If this thread was interrupted while reading data.
*/
+ private void processSelectedKeysOptimized(SelectionKey[] keys) throws ClosedByInterruptException {
+ for (int i = 0; ; i ++) {
+ final SelectionKey key = keys[i];
+
+ if (key == null)
+ break;
+
+ // null out entry in the array to allow to have it GC'ed once the Channel close
+ // See https://github.com/netty/netty/issues/2363
+ keys[i] = null;
+
+ // Was key closed?
+ if (!key.isValid())
+ continue;
+
+ GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ assert ses != null;
+
+ try {
+ if (key.isReadable())
+ processRead(key);
+
+ if (key.isValid() && key.isWritable())
+ processWrite(key);
+ }
+ catch (ClosedByInterruptException e) {
+ // This exception will be handled in bodyInternal() method.
+ throw e;
+ }
+ catch (Exception e) {
+ if (!closed)
+ U.warn(log, "Failed to process selector key (will close): " + ses, e);
+
+ close(ses, new GridNioException(e));
+ }
+ }
+ }
+
+ /**
+ * Processes keys selected by a selector.
+ *
+ * @param keys Selected keys.
+ * @throws ClosedByInterruptException If this thread was interrupted while reading data.
+ */
private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException {
if (log.isTraceEnabled())
log.trace("Processing keys in client worker: " + keys.size());
+ if (keys.isEmpty())
+ return;
+
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = iter.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index deb7d2b..1241f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -309,4 +309,4 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
@Override public String toString() {
return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
new file mode 100644
index 0000000..9aa245d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+
+import java.nio.channels.SelectionKey;
+import java.util.AbstractSet;
+import java.util.Iterator;
+
+final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
+
+ private SelectionKey[] keysA;
+ private int keysASize;
+ private SelectionKey[] keysB;
+ private int keysBSize;
+ private boolean isA = true;
+
+ SelectedSelectionKeySet() {
+ keysA = new SelectionKey[1024];
+ keysB = keysA.clone();
+ }
+
+ @Override
+ public boolean add(SelectionKey o) {
+ if (o == null) {
+ return false;
+ }
+
+ if (isA) {
+ int size = keysASize;
+ keysA[size ++] = o;
+ keysASize = size;
+ if (size == keysA.length) {
+ doubleCapacityA();
+ }
+ } else {
+ int size = keysBSize;
+ keysB[size ++] = o;
+ keysBSize = size;
+ if (size == keysB.length) {
+ doubleCapacityB();
+ }
+ }
+
+ return true;
+ }
+
+ private void doubleCapacityA() {
+ SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
+ System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
+ keysA = newKeysA;
+ }
+
+ private void doubleCapacityB() {
+ SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
+ System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
+ keysB = newKeysB;
+ }
+
+ SelectionKey[] flip() {
+ if (isA) {
+ isA = false;
+ keysA[keysASize] = null;
+ keysBSize = 0;
+ return keysA;
+ } else {
+ isA = true;
+ keysB[keysBSize] = null;
+ keysASize = 0;
+ return keysB;
+ }
+ }
+
+ @Override
+ public int size() {
+ if (isA) {
+ return keysASize;
+ } else {
+ return keysBSize;
+ }
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return false;
+ }
+
+ @Override
+ public Iterator<SelectionKey> iterator() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
index 6098007..89e5f16 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
@@ -250,7 +250,9 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
/** {@inheritDoc} */
@Override public Set<Map.Entry<V1, V2>> entrySet() {
- return Collections.<Entry<V1, V2>>singleton(this);
+ return isEmpty() ?
+ Collections.<Entry<V1,V2>>emptySet() :
+ Collections.<Entry<V1, V2>>singleton(this);
}
/** {@inheritDoc} */
@@ -301,4 +303,4 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
@Override public String toString() {
return S.toString(IgniteBiTuple.class, this);
}
-}
\ No newline at end of file
+}