You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/14 14:43:11 UTC
[4/7] ignite git commit: Merge master into ignite-3477
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ba48e7a,a97b0fe..6bf96b0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -329,17 -329,20 +329,22 @@@ public class GridNearTxLocal extends Gr
final boolean skipVals,
final boolean needVer,
boolean keepBinary,
+ boolean recovery,
+ final ExpiryPolicy expiryPlc,
final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
+ IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
+ accessPolicy(cacheCtx, keys) :
+ cacheCtx.cache().expiryPolicy(expiryPlc);
+
if (cacheCtx.isNear()) {
return cacheCtx.nearTx().txLoadAsync(this,
topVer,
keys,
readThrough,
/*deserializeBinary*/false,
+ recovery,
- accessPolicy(cacheCtx, keys),
+ expiryPlc0,
skipVals,
needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
@Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
@@@ -402,8 -404,7 +407,8 @@@
CU.subjectId(this, cctx),
resolveTaskName(),
/*deserializeBinary*/false,
+ recovery,
- accessPolicy(cacheCtx, keys),
+ expiryPlc0,
skipVals,
/*can remap*/true,
needVer,
@@@ -435,9 -436,9 +440,10 @@@
async,
keys,
skipVals,
-- keepBinary,
needVer,
++ keepBinary,
+ recovery,
+ expiryPlc,
c);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 7aaa476,f86df2f..da92692
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@@ -407,12 -412,13 +408,11 @@@ public class GridLocalAtomicCache<K, V
if (entry != null) {
CacheObject v;
- GridCacheVersion ver;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
- /*swap*/swapOrOffheap,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6731179,7af3485..da5b326
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -365,12 -380,22 +377,21 @@@ public abstract class GridCacheQueryMan
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
- public void onSwap(CacheObject key) throws IgniteCheckedException {
+ public void onSwap(KeyCacheObject key, int partId) throws IgniteCheckedException {
+ if(!enabled)
+ return;
-
if (!enterBusy())
return; // Ignore index update when node is stopping.
try {
- qryProc.onSwap(space, key, partId);
+ if (isIndexingSpiEnabled()) {
+ Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+ cctx.kernalContext().indexing().onSwap(space, key0);
+ }
+
+ if(qryProcEnabled)
- qryProc.onSwap(space, key);
++ qryProc.onSwap(space, key, partId);
}
finally {
leaveBusy();
@@@ -384,12 -417,26 +413,25 @@@
* @param val Value
* @throws IgniteCheckedException If failed.
*/
- public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
+ public void onUnswap(KeyCacheObject key, int partId, CacheObject val) throws IgniteCheckedException {
+ if(!enabled)
+ return;
-
if (!enterBusy())
return; // Ignore index update when node is stopping.
try {
- qryProc.onUnswap(space, key, partId, val);
+ if (isIndexingSpiEnabled()) {
+ CacheObjectContext coctx = cctx.cacheObjectContext();
+
+ Object key0 = unwrapIfNeeded(key, coctx);
+
+ Object val0 = unwrapIfNeeded(val, coctx);
+
+ cctx.kernalContext().indexing().onUnswap(space, key0, val0);
+ }
+
+ if(qryProcEnabled)
- qryProc.onUnswap(space, key, val);
++ qryProc.onUnswap(space, key, partId, val);
}
finally {
leaveBusy();
@@@ -434,10 -470,21 +476,21 @@@
return; // No-op.
if (!enterBusy())
- return; // Ignore index update when node is stopping.
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
+ if (isIndexingSpiEnabled()) {
+ CacheObjectContext coctx = cctx.cacheObjectContext();
+
+ Object key0 = unwrapIfNeeded(key, coctx);
+
+ Object val0 = unwrapIfNeeded(val, coctx);
+
+ cctx.kernalContext().indexing().store(space, key0, val0, expirationTime);
+ }
+
+ if(qryProcEnabled)
- qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
++ qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
}
finally {
invalidateResultCache();
@@@ -464,7 -509,14 +517,14 @@@
return; // Ignore index update when node is stopping.
try {
- qryProc.remove(space, key, partId, val, ver);
+ if (isIndexingSpiEnabled()) {
+ Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+
+ cctx.kernalContext().indexing().remove(space, key0);
+ }
+
+ if(qryProcEnabled)
- qryProc.remove(space, key, val);
++ qryProc.remove(space, key, partId, val, ver);
}
finally {
invalidateResultCache();
@@@ -2966,32 -3582,40 +3036,36 @@@
private void advance() {
IgniteBiTuple<K, V> next0 = null;
- while (entryIt.hasNext()) {
- next0 = null;
+ while (it.hasNext()) {
+ CacheDataRow row = it.next();
- GridCacheEntryEx entry = entryIt.next();
+ KeyCacheObject key = row.key();
+ if (entry.deleted())
+ continue;
+
+ KeyCacheObject key = entry.key();
CacheObject val;
- try {
- if (heapOnly)
- val = entry.peek(true, false, false, expiryPlc);
- else
- val = value(entry, entry.key());
- }
- catch (GridCacheEntryRemovedException ignore) {
- assert heapOnly;
-
- continue;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
+ if (expiryPlc != null) {
+ try {
+ val = value(key);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
- val = null;
- }
+ val = null;
+ }
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ if (dht != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
- expiryPlc = cctx.cache().expiryPolicy(plc);
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
}
+ else
+ val = row.value();
if (val != null) {
boolean keepBinary0 = !locNode || keepBinary;
@@@ -3043,22 -3670,20 +3117,18 @@@
* @return Value.
* @throws IgniteCheckedException If failed to peek value.
*/
- private CacheObject value(GridCacheEntryEx entry, KeyCacheObject key) throws IgniteCheckedException {
+ private CacheObject value(KeyCacheObject key) throws IgniteCheckedException {
- GridCacheEntryEx entry = null;
-
- try {
- entry = cache.entryEx(key);
+ while (true) {
+ try {
- if (entry == null)
- entry = cache.entryEx(key);
++ GridCacheEntryEx entry = cache.entryEx(key);
- entry.unswap();
- if (expiryPlc != null)
- entry.unswap();
++ entry.unswap();
- return entry.peek(true, false, topVer, expiryPlc);
- }
- catch (GridCacheEntryRemovedException ignore) {
- return null;
- }
- finally {
- if (entry != null)
- cctx.evicts().touch(entry, topVer);
- return entry.peek(true, true, true, topVer, expiryPlc);
++ return entry.peek(true, true, topVer, expiryPlc);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
- entry = null;
++ // No-op.
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 41eec6a,b3f0684..a540fdd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -864,21 -882,21 +885,22 @@@ public class CacheContinuousQueryHandle
GridCacheAffinityManager aff = cctx.affinity();
if (initUpdCntrsPerNode != null) {
- for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+ for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
- Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
+ Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
if (map != null) {
- partCntr = map.get(partId);
+ partCntrs = map.get(partId);
break;
}
}
}
else if (initUpdCntrs != null)
- partCntr = initUpdCntrs.get(partId);
+ partCntrs = initUpdCntrs.get(partId);
}
- rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntrs != null ? partCntrs.get2() : null);
- rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0, partCntr);
++ rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
++ partCntrs != null ? partCntrs.get2() : null);
PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba2ab3c,90a68ad..9f3463f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1502,20 -1490,19 +1502,20 @@@ public class IgniteTxHandler
/*expiryPlc*/null,
/*keepBinary*/true);
- if (val == null)
- val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
+ if (val == null)
+ val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
- if (val != null)
- entry.readValue(val);
+ if (val != null)
+ entry.readValue(val);
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got entry removed exception, will retry: " + entry.txKey());
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got entry removed exception, will retry: " + entry.txKey());
- entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
+ entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f05fdf7,bd806aa..f334b84
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -403,7 -400,7 +404,8 @@@ public abstract class IgniteTxLocalAdap
boolean skipVals,
boolean needVer,
boolean keepBinary,
+ boolean recovery,
+ final ExpiryPolicy expiryPlc,
final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
assert cacheCtx.isLocal() : cacheCtx.name();
@@@ -431,9 -430,11 +435,9 @@@
continue;
try {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
this,
- /*readSwap*/true,
- /*unmarshal*/true,
/*update-metrics*/!skipVals,
/*event*/!skipVals,
CU.subjectId(this, cctx),
@@@ -1246,9 -1224,11 +1256,9 @@@
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+ getRes = txEntry.cached().innerGetVersioned(
null,
this,
- /*swap*/true,
- /*unmarshal*/true,
/*update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(this, cctx),
@@@ -1499,7 -1497,7 +1522,8 @@@
skipVals,
needReadVer,
!deserializeBinary,
+ recovery,
+ expiryPlc,
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
if (isRollbackOnly()) {
@@@ -1675,10 -1681,12 +1708,10 @@@
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+ getRes = cached.innerGetVersioned(
null,
IgniteTxLocalAdapter.this,
- /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
- /*update-metrics*/true,
+ /**update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(IgniteTxLocalAdapter.this, cctx),
transformClo,
@@@ -2041,7 -2055,7 +2081,8 @@@
/*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary,
- recovery);
++ recovery,
+ expiryPlc);
}
return new GridFinishedFuture<>();
@@@ -2212,7 -2225,7 +2253,8 @@@
/*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary,
- recovery);
++ recovery,
+ expiryPlc);
}
return new GridFinishedFuture<>();
@@@ -2232,8 -2245,7 +2274,9 @@@
* @param hasFilters {@code True} if filters not empty.
* @param readThrough Read through flag.
* @param retval Return value flag.
+ * @param keepBinary Keep binary flag.
+ * @param recovery Recovery flag.
+ * @param expiryPlc Expiry policy.
* @return Load future.
*/
private IgniteInternalFuture<Void> loadMissing(
@@@ -2248,8 -2260,7 +2291,8 @@@
final boolean readThrough,
final boolean retval,
final boolean keepBinary,
- final boolean recovery
- ) {
++ final boolean recovery,
+ final ExpiryPolicy expiryPlc) {
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key,
@@@ -2323,7 -2334,7 +2366,8 @@@
/*skipVals*/singleRmv,
needReadVer,
keepBinary,
+ recovery,
+ expiryPlc,
c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index d6b09e4,f5687a0..cf1e7e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -192,6 -194,6 +194,7 @@@ public interface IgniteTxLocalEx extend
boolean skipVals,
boolean needVer,
boolean keepBinary,
+ boolean recovery,
+ final ExpiryPolicy expiryPlc,
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9012214,68479a6..df2f7d9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1925,59 -1926,24 +1923,59 @@@ public class DataStreamerImpl<K, V> imp
ExpiryPolicy plc = cctx.expiry();
- for (Entry<KeyCacheObject, CacheObject> e : entries) {
- try {
- e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+ Collection<Integer> reservedParts = new HashSet<>();
+ Collection<Integer> ignoredParts = new HashSet<>();
+
+ try {
+ for (Entry<KeyCacheObject, CacheObject> e : entries) {
+ cctx.shared().database().checkpointReadLock();
- GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+ try {
+ e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
- if (plc != null) {
- ttl = CU.toTtl(plc.getExpiryForCreation());
+ if (!cctx.isLocal()) {
+ int p = cctx.affinity().partition(e.getKey());
- if (ttl == CU.TTL_ZERO)
- continue;
- else if (ttl == CU.TTL_NOT_CHANGED)
- ttl = 0;
+ if (ignoredParts.contains(p))
+ continue;
- expiryTime = CU.toExpireTime(ttl);
- }
+ if (!reservedParts.contains(p)) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, true);
+
+ if (!part.reserve()) {
+ ignoredParts.add(p);
+
+ continue;
+ }
+ else {
+ // We must not allow to read from RENTING partitions.
+ if (part.state() == GridDhtPartitionState.RENTING) {
+ part.release();
+
+ ignoredParts.add(p);
+
+ continue;
+ }
+
+ reservedParts.add(p);
+ }
+ }
+ }
+
+ GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+
+ if (plc != null) {
+ ttl = CU.toTtl(plc.getExpiryForCreation());
+
+ if (ttl == CU.TTL_ZERO)
+ continue;
+ else if (ttl == CU.TTL_NOT_CHANGED)
+ ttl = 0;
+
+ expiryTime = CU.toExpireTime(ttl);
+ }
- boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer);
+ boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer);
entry.initialValue(e.getValue(),
ver,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 03a6f9b,fddb8df..3607a5b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -980,39 -867,13 +954,14 @@@ public class GridQueryProcessor extend
if (typeDesc == null || !typeDesc.registered())
throw new CacheException("Failed to find SQL table for type: " + type);
- final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
- space,
- sqlQry,
- F.asList(params),
- typeDesc,
- idx.backupFilter(requestTopVer.get(), null));
+ qry.setType(typeDesc.name());
sendQueryExecutedEvent(
- sqlQry,
- params,
- space);
-
- return new ClIter<Cache.Entry<K, V>>() {
- @Override public void close() throws Exception {
- i.close();
- }
-
- @Override public boolean hasNext() {
- return i.hasNext();
- }
-
- @Override public Cache.Entry<K, V> next() {
- IgniteBiTuple<K, V> t = i.next();
-
- return new CacheEntryImpl<>(
- (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
- (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ qry.getSql(),
- qry.getArgs());
++ qry.getArgs(),
++ cctx.name());
+
+ return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
}
}, true);
}
@@@ -1093,34 -994,26 +1082,26 @@@
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- final boolean keepBinary = cctx.keepBinary();
-
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
@Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
- final String space = cctx.name();
- final String sql = qry.getSql();
- final Object[] args = qry.getArgs();
- final GridQueryCancel cancel = new GridQueryCancel();
+ GridQueryCancel cancel = new GridQueryCancel();
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
- idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+ final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+ idx.backupFilter(requestTopVer.get(), null), cancel);
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- try {
- sendQueryExecutedEvent(sql, args, space);
-
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, cancel);
- sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
++ sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
- cursor.fieldsMeta(res.metaData());
-
- return cursor;
+ return cursor.iterator();
+ }
+ }, cancel) {
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+ if (cursor instanceof QueryCursorImpl)
+ return ((QueryCursorImpl)cursor).fieldsMeta();
+ return super.fieldsMeta();
+ }
+ };
}
}, true);
}
@@@ -1161,15 -1043,7 +1131,14 @@@
throw new IllegalStateException("Failed to remove from index (grid is stopping).");
try {
- if (coctx == null)
- coctx = cacheObjectContext(space);
- idx.remove(space, key, val);
++ CacheObjectContext coctx = cacheObjectContext(space);
+
+ TypeDescriptor desc = typeByValue(coctx, key, val, false);
+
+ if (desc == null)
+ return;
+
+ idx.remove(space, desc, key, partId, val, ver);
}
finally {
busyLock.leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 914c3a3,99146aa..13cbf1d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -305,58 -308,54 +305,59 @@@ public class GridServiceProcessor exten
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
- if (ctx.isDaemon())
- return;
-
busyLock.block();
- if (!ctx.clientNode())
- ctx.event().removeDiscoveryEventListener(topLsnr);
-
- Collection<ServiceContextImpl> ctxs = new ArrayList<>();
+ try {
+ if (ctx.isDaemon())
+ return;
- synchronized (locSvcs) {
- for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
- ctxs.addAll(ctxs0);
- }
+ if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
++ ctx.event().removeDiscoveryEventListener(topLsnr);
- for (ServiceContextImpl ctx : ctxs) {
- ctx.setCancelled(true);
+ Collection<ServiceContextImpl> ctxs = new ArrayList<>();
- Service svc = ctx.service();
+ synchronized (locSvcs) {
+ for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
+ ctxs.addAll(ctxs0);
+ }
- if (svc != null)
- svc.cancel(ctx);
+ for (ServiceContextImpl ctx : ctxs) {
+ ctx.setCancelled(true);
- ctx.executor().shutdownNow();
- }
+ Service svc = ctx.service();
- for (ServiceContextImpl ctx : ctxs) {
- try {
- if (log.isInfoEnabled() && !ctxs.isEmpty())
- log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" +
- U.id8(ctx.executionId()) + ']');
+ if (svc != null)
+ svc.cancel(ctx);
- ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ ctx.executor().shutdownNow();
}
- catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " +
- ctx.name());
+ for (ServiceContextImpl ctx : ctxs) {
+ try {
+ if (log.isInfoEnabled() && !ctxs.isEmpty())
+ log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" +
+ U.id8(ctx.executionId()) + ']');
+
+ ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+
+ U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " +
+ ctx.name());
+ }
}
- }
- U.shutdownNow(GridServiceProcessor.class, depExe, log);
+ U.shutdownNow(GridServiceProcessor.class, depExe, log);
- Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+ Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
- cancelFutures(depFuts, err);
- cancelFutures(undepFuts, err);
+ cancelFutures(depFuts, err);
+ cancelFutures(undepFuts, err);
- }finally {
++ }
++ finally {
+ busyLock.unblock();
+ }
if (log.isDebugEnabled())
log.debug("Stopped service processor.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1be9f75,74e4450..e7951f9
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@@ -74,7 -74,7 +74,8 @@@ import java.nio.channels.SelectionKey
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.nio.file.Files;
+import java.nio.file.Path;
+ import java.nio.file.Paths;
import java.security.AccessController;
import java.security.KeyManagementException;
import java.security.MessageDigest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 9a25aa8,a64ec6d..e4c356a
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@@ -285,7 -294,11 +297,10 @@@ public class VisorGatewayTask implement
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public Object execute() throws IgniteException {
+ if (fut != null)
+ return fut.get();
+
String nidsArg = argument(0);
String taskName = argument(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index cc1c678,1ac90ad..fbc617b
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@@ -131,9 -131,8 +131,9 @@@ public class VisorQueryJob extends Viso
if (scanWithFilter) {
boolean caseSensitive = qryTxt.startsWith(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE);
- String ptrn = qryTxt.substring(caseSensitive
- String ptrn = qryTxt.substring(
- caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length() : SCAN_CACHE_WITH_FILTER.length());
++ String ptrn = qryTxt.substring(caseSensitive
+ ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length()
+ : SCAN_CACHE_WITH_FILTER.length());
filter = new VisorQueryScanSubstringFilter(caseSensitive, ptrn);
}
@@@ -161,7 -160,8 +161,8 @@@
SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt());
qry.setPageSize(arg.pageSize());
qry.setLocal(arg.local());
- qry.setDistributedJoins(arg instanceof VisorQueryArgV2 && ((VisorQueryArgV2)arg).distributedJoins());
- qry.setEnforceJoinOrder(arg instanceof VisorQueryArgV3 && ((VisorQueryArgV3)arg).enforceJoinOrder());
+ qry.setDistributedJoins(arg.distributedJoins());
++ qry.setEnforceJoinOrder(arg.enforceJoinOrder());
long start = U.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 911e832,8e4590e..e1cd7cf
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@@ -52,8 -53,9 +52,8 @@@ import org.apache.ignite.IgniteLogger
import org.apache.ignite.cache.eviction.EvictionPolicy;
import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicyMBean;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicyMBean;
- import org.apache.ignite.cache.eviction.random.RandomEvictionPolicyMBean;
+ import org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicyMBean;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.util.typedef.F;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2c86322,ba75dcb..ed356fd
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@@ -5869,14 -5865,15 +5865,14 @@@ public abstract class GridCacheAbstract
@Override public void run(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
- if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
- return;
-
int size = 0;
+ if (ctx.isNear())
+ ctx = ctx.near().dht().context();
+
for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+ GridCacheEntryEx e = ctx.cache().entryEx(key);
assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
assert !e.deleted() : "Entry is deleted: " + e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index be3933f,1ecc2d1..49f0c98
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@@ -411,9 -417,31 +410,29 @@@ public class GridCacheTestEntryEx exten
}
/** @inheritDoc */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Override public void clearReserveForLoad(GridCacheVersion ver) {
+ assert false;
+ }
+
+ /** @inheritDoc */
+ @Override public EntryGetResult innerGetAndReserveForLoad(
+ boolean readSwap,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary,
+ @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ assert false;
+
+ return null;
+ }
+
+ /** @inheritDoc */
+ @Nullable @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
- boolean readSwap,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
UUID subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index b746883,e47a18d..a78d8ef
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@@ -5553,12 -5545,10 +5551,12 @@@ public class IgniteCacheConfigVariation
int size = 0;
+ if (ctx.isNear())
+ ctx = ctx.near().dht().context();
+
for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+ GridCacheEntryEx e = ctx.cache().entryEx(key);
assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
assert !e.deleted() : "Entry is deleted: " + e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
index 4f02fa2,aeca2fb..c0d7745
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
@@@ -364,14 -363,10 +364,14 @@@ public abstract class GridCacheAbstract
GridCacheAffinityManager aff = cctx.affinity();
+ CachePeekMode[] modes = new CachePeekMode[]{CachePeekMode.ALL};
+
for (int i = 0; i < gridCount(); i++) {
- for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).allEntries()) {
- if (aff.primaryByKey(grid(i).localNode(), e.key(), AffinityTopologyVersion.NONE)
- && e.key().value(cctx.cacheObjectContext(), false) instanceof GridCacheQueueHeaderKey)
+ for (Cache.Entry e : grid(i).context().cache().internalCache(cctx.name()).localEntries(modes)) {
+ Object key = e.getKey();
+
- if (aff.primary(grid(i).localNode(), key, AffinityTopologyVersion.NONE)
++ if (aff.primaryByKey(grid(i).localNode(), key, AffinityTopologyVersion.NONE)
+ && key instanceof GridCacheQueueHeaderKey)
return i;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 177c878,3fd4dd8..fd310c4
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@@ -384,13 -384,17 +384,13 @@@ public class GridCacheAtomicInvalidPart
GridCacheEntryEx entry = null;
- if (memMode == TestMemoryMode.HEAP)
- entry = c.peekEx(k);
- else {
- try {
- entry = c.entryEx(k);
+ try {
+ entry = c.entryEx(k);
- entry.unswap();
- }
- catch (GridDhtInvalidPartitionException ignored) {
- // Skip key.
- }
+ entry.unswap();
+ }
- catch (GridDhtInvalidPartitionException e) {
++ catch (GridDhtInvalidPartitionException ignored) {
+ // Skip key.
}
for (int r = 0; r < 10; r++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 04004b7,7e3ff5c..e484f1c
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@@ -237,10 -237,10 +237,10 @@@ public class GridCacheRebalancingSyncSe
int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 1 : 0;
- waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer));
- waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer));
+ waitForRebalancing(0, 2, waitMinorVer);
+ waitForRebalancing(1, 2, waitMinorVer);
- awaitPartitionMapExchange(true, true, null);
+ awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
@@@ -258,10 -258,10 +258,10 @@@
startGrid(2);
- waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer));
- waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer));
+ waitForRebalancing(1, 4, waitMinorVer);
+ waitForRebalancing(2, 4, waitMinorVer);
- awaitPartitionMapExchange(true, true, null);
+ awaitPartitionMapExchange(true, true, null, true);
checkPartitionMapExchangeFinished();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/EvictionAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 4ceb1b6,8f08ea9..806fc8e
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@@ -36,11 -36,16 +36,14 @@@ import javax.cache.expiry.ExpiryPolicy
import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
+ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMemoryMode;
+ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@@ -49,7 -54,9 +52,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
index fe53fc7,1f6ec2d..b5ca2de
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java
@@@ -36,9 -40,11 +40,12 @@@ import org.apache.ignite.internal.Ignit
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.transactions.Transaction;
+ import org.apache.ignite.transactions.TransactionConcurrency;
+ import org.apache.ignite.transactions.TransactionIsolation;
/**
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 480928c,3bdf0bd..f0b6621
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@@ -61,7 -58,7 +61,8 @@@ import org.apache.ignite.configuration.
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index d225501,03204e2..d721c69
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@@ -77,16 -66,8 +78,18 @@@ public class IgniteCacheTestSuite5 exte
suite.addTestSuite(CacheRebalancingSelfTest.class);
+ // Affinity tests.
+ suite.addTestSuite(FairAffinityFunctionNodesSelfTest.class);
+ suite.addTestSuite(FairAffinityFunctionSelfTest.class);
+ suite.addTestSuite(FairAffinityDynamicCacheSelfTest.class);
+ suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);
+ suite.addTestSuite(IgniteCacheAffinitySelfTest.class);
+ suite.addTestSuite(AffinityClientNodeSelfTest.class);
+ suite.addTestSuite(LocalAffinityFunctionTest.class);
+ suite.addTestSuite(AffinityHistoryCleanupTest.class);
+
+ suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 9a0f00f,41035ec..c6ada36
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -42,9 -42,9 +42,8 @@@ import org.apache.ignite.internal.manag
import org.apache.ignite.internal.managers.deployment.GridDeploymentManagerStopSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCacheSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest;
- import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerSelfTest;
import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest;
import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
-import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManagerSelfTest;
import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest;
import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --cc modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index b6060b4,c3a1362..bc27ae7
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@@ -124,7 -134,12 +139,12 @@@ public class GridH2SpatialIndex extend
}
/** {@inheritDoc} */
+ @Override protected int segmentsCount() {
+ return segments.length;
+ }
+
+ /** {@inheritDoc} */
- @Nullable @Override protected Object doTakeSnapshot() {
+ @Nullable @Override protected IgniteTree doTakeSnapshot() {
return null; // TODO We do not support snapshots, but probably this is possible.
}
@@@ -263,7 -282,11 +287,11 @@@
try {
checkClosed();
- return new H2Cursor(rowIterator(treeMap.keySet().iterator(), filter));
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
- return new GridH2Cursor(rowIterator(segment.keySet().iterator(), filter));
++ return new H2Cursor(rowIterator(segment.keySet().iterator(), filter));
}
finally {
l.unlock();
@@@ -315,12 -333,13 +343,16 @@@
if (!first)
throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
- GridCursor<GridH2Row> iter = rowIterator(treeMap.keySet().iterator(), null);
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
- Iterator<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
++ GridCursor<GridH2Row> iter = rowIterator(segment.keySet().iterator(), null);
- return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
+ return new SingleRowCursor(iter.next() ? iter.get() : null);
+ }
+ catch (IgniteCheckedException e) {
+ throw DbException.convert(e);
}
finally {
l.unlock();
@@@ -347,7 -366,11 +379,11 @@@
if (intersection == null)
return find(filter.getSession(), null, null);
- return new H2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
+ final int seg = threadLocalSegment();
+
+ final MVRTreeMap<Long> segment = segments[seg];
+
- return new GridH2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
++ return new H2Cursor(rowIterator(segment.findIntersectingKeys(getEnvelope(intersection, 0)), filter));
}
finally {
l.unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index cde6da6,2aa4292..da01d18
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@@ -28,8 -28,9 +28,9 @@@ import org.apache.hadoop.mapred.Reporte
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -54,52 -55,64 +55,64 @@@ public class HadoopV1MapTask extends Ha
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
+ if (taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
- InputFormat inFormat = jobConf.getInputFormat();
+ try {
+ JobConf jobConf = taskCtx0.jobConf();
- HadoopInputSplit split = info().inputSplit();
+ InputFormat inFormat = jobConf.getInputFormat();
- InputSplit nativeSplit;
+ HadoopInputSplit split = info().inputSplit();
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock block = (HadoopFileBlock)split;
+ InputSplit nativeSplit;
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
- }
- else
- nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
- assert nativeSplit != null;
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
- Reporter reporter = new HadoopV1Reporter(taskCtx);
+ assert nativeSplit != null;
- HadoopV1OutputCollector collector = null;
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
- try {
- collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
- fileName(), ctx.attemptId());
+ HadoopV1OutputCollector collector = null;
- RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+ try {
+ collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), taskCtx0.attemptId());
- Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
- Object key = reader.createKey();
- Object val = reader.createValue();
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
- assert mapper != null;
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
- try {
try {
- while (reader.next(key, val)) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Map task cancelled.");
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
- mapper.map(key, val, collector, reporter);
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ mapper.close();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 6b90653,5c1dd15..2bbf8bc
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@@ -22,7 -22,8 +22,8 @@@ import org.apache.hadoop.mapred.Reducer
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -51,34 -52,47 +52,47 @@@ public class HadoopV1ReduceTask extend
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
-
- HadoopTaskInput input = taskCtx.input();
-
- HadoopV1OutputCollector collector = null;
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+ JobConf jobConf = taskCtx0.jobConf();
- Reducer reducer;
- if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
- jobConf);
- else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
- jobConf);
+ HadoopTaskInput input = taskCtx.input();
- assert reducer != null;
+ HadoopV1OutputCollector collector = null;
try {
+ collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+ jobConf);
+ else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
try {
- while (input.next()) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ if (!reduce)
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ reducer.close();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index 11f2ecc,6202622..5f8c506
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@@ -30,8 -30,7 +30,7 @@@ import org.apache.hadoop.mapreduce.lib.
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
- import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------