You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:33 UTC
[43/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to
ignite-3477
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d746d54,546be37..00a4b2e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -507,10 -482,10 +490,10 @@@ public abstract class GridCacheQueryMan
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("SimplifiableIfStatement")
- public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException {
+ public void remove(KeyCacheObject key, int partId, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException {
assert key != null;
- if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
+ if (!QueryUtils.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
return; // No-op.
if (!enterBusy())
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f334b84,dc4e52f..a59ff51
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -36,12 -30,8 +30,12 @@@ import javax.cache.expiry.Duration
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.IgniteInternalFuture;
- import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@@ -1142,304 -963,165 +995,163 @@@ public abstract class IgniteTxLocalAdap
/**
* @param cacheCtx Cache context.
- * @param keys Key to enlist.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param map Return map.
- * @param missed Map of missed keys.
- * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param skipStore Skip store flag.
- * @throws IgniteCheckedException If failed.
- * @return Enlisted keys.
+ * @param keys Keys.
+ * @return Expiry policy.
*/
- @SuppressWarnings({"RedundantTypeArguments"})
- private <K, V> Collection<KeyCacheObject> enlistRead(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- @Nullable ExpiryPolicy expiryPlc,
- Map<K, V> map,
- Map<KeyCacheObject, GridCacheVersion> missed,
- int keysCnt,
- boolean deserializeBinary,
- boolean skipVals,
- boolean keepCacheObjects,
- boolean skipStore,
- final boolean needVer,
- final boolean recovery
- ) throws IgniteCheckedException {
- assert !F.isEmpty(keys);
- assert keysCnt == keys.size();
-
- cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
-
- boolean single = keysCnt == 1;
-
- Collection<KeyCacheObject> lockKeys = null;
-
- AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
-
- boolean needReadVer = (serializable() && optimistic()) || needVer;
-
- // In this loop we cover only read-committed or optimistic transactions.
- // Transactions that are pessimistic and not read-committed are covered
- // outside of this loop.
- for (KeyCacheObject key : keys) {
- if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
- addActiveCache(cacheCtx, recovery);
-
- IgniteTxKey txKey = cacheCtx.txKey(key);
-
- // Check write map (always check writes first).
- IgniteTxEntry txEntry = entry(txKey);
-
- // Either non-read-committed or there was a previous write.
- if (txEntry != null) {
- CacheObject val = txEntry.value();
-
- if (txEntry.hasValue()) {
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- if (val != null) {
- GridCacheVersion ver = null;
+ protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+ return null;
+ }
- if (needVer) {
- if (txEntry.op() != READ)
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
- else {
- ver = txEntry.entryReadVersion();
+ /**
+ * Post lock processing for put or remove.
+ *
+ * @param cacheCtx Context.
+ * @param keys Keys.
+ * @param ret Return value.
+ * @param rmv {@code True} if remove.
+ * @param retval Flag to return value or not.
+ * @param read {@code True} if read.
+ * @param accessTtl TTL for read operation.
+ * @param filter Filter to check entries.
+ * @throws IgniteCheckedException If error.
+ * @param computeInvoke If {@code true} computes return value for invoke operation.
+ */
+ @SuppressWarnings("unchecked")
+ protected final void postLockWrite(
+ GridCacheContext cacheCtx,
+ Iterable<KeyCacheObject> keys,
+ GridCacheReturn ret,
+ boolean rmv,
+ boolean retval,
+ boolean read,
+ long accessTtl,
+ CacheEntryPredicate[] filter,
+ boolean computeInvoke
+ ) throws IgniteCheckedException {
+ for (KeyCacheObject k : keys) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
- if (ver == null && pessimistic()) {
- while (true) {
- try {
- GridCacheEntryEx cached = txEntry.cached();
+ if (txEntry == null)
+ throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+ "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
- ver = cached.isNear() ?
- ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
- }
- }
- }
+ try {
+ assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+ "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+ ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
- if (ver == null) {
- assert optimistic() && repeatableRead() : this;
+ if (log.isDebugEnabled())
+ log.debug("Post lock write entry: " + cached);
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
- }
- }
+ CacheObject v = txEntry.previousValue();
+ boolean hasPrevVal = txEntry.hasPreviousValue();
- assert ver != null;
- }
+ if (onePhaseCommit())
+ filter = txEntry.filters();
- cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
- ver, 0, 0);
- }
- }
- else {
- assert txEntry.op() == TRANSFORM;
+ // If we have user-passed filter, we must read value into entry for peek().
+ if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+ retval = true;
- while (true) {
- try {
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
+ boolean invoke = txEntry.op() == TRANSFORM;
- Object transformClo =
- (txEntry.op() == TRANSFORM &&
- cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.entryProcessors()) : null;
+ if (retval || invoke) {
+ if (!cacheCtx.isNear()) {
+ if (!hasPrevVal) {
+ // For non-local cache should read from store after lock on primary.
+ boolean readThrough = cacheCtx.isLocal() &&
+ (invoke || cacheCtx.loadPreviousValue()) &&
+ !txEntry.skipStore();
- if (needVer) {
- getRes = txEntry.cached().innerGetVersioned(
+ v = cached.innerGet(
null,
this,
- /*update-metrics*/true,
- /*event*/!skipVals,
- /*swap*/true,
+ readThrough,
+ /*metrics*/!invoke,
+ /*event*/!invoke && !dht(),
- /*temporary*/false,
CU.subjectId(this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary(),
- null);
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
- }
- }
- else {
- val = txEntry.cached().innerGet(
null,
- this,
- /*read-through*/false,
- /*metrics*/true,
- /*event*/!skipVals,
- CU.subjectId(this, cctx),
- transformClo,
resolveTaskName(),
null,
txEntry.keepBinary());
}
-
- if (val != null) {
- if (!readCommitted() && !skipVals)
- txEntry.readValue(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
- }
- else
- missed.put(key, txEntry.cached().version());
-
- break;
}
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ else {
+ if (!hasPrevVal)
- v = cached.rawGetOrUnmarshal(false);
++ v = cached.rawGet();
}
- }
- }
- }
- // First time access within transaction.
- else {
- if (lockKeys == null && !skipVals)
- lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
- if (!single && !skipVals)
- lockKeys.add(key);
+ if (txEntry.op() == TRANSFORM) {
+ if (computeInvoke) {
+ GridCacheVersion ver;
+
+ try {
+ ver = cached.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
- try {
- GridCacheVersion ver = entry.version();
-
- CacheObject val = null;
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
-
- if (!pessimistic() || readCommitted() && !skipVals) {
- IgniteCacheExpiryPolicy accessPlc =
- optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
-
- if (needReadVer) {
- getRes = primaryLocal(entry) ?
- entry.innerGetVersioned(
- null,
- this,
- /*metrics*/true,
- /*event*/true,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary,
- null) : null;
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
+ ver = null;
}
- }
- else {
- val = entry.innerGet(
- null,
- this,
- /*no read-through*/false,
- /*metrics*/true,
- /*event*/true,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary);
- }
- if (val != null) {
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
+ addInvokeResult(txEntry, v, ret, ver);
}
- else
- missed.put(key, ver);
}
else
- // We must wait for the lock in pessimistic mode.
- missed.put(key, ver);
-
- if (!readCommitted() && !skipVals) {
- txEntry = addEntry(READ,
- val,
- null,
- null,
- entry,
- expiryPlc,
- null,
- true,
- -1L,
- -1L,
- null,
- skipStore,
- !deserializeBinary);
-
- // As optimization, mark as checked immediately
- // for non-pessimistic if value is not null.
- if (val != null && !pessimistic()) {
- txEntry.markValid();
-
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(readVer);
- }
- }
- }
-
- break; // While.
+ ret.value(cacheCtx, v, txEntry.keepBinary());
}
- catch (GridCacheEntryRemovedException ignored) {
+
+ boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
+
+ // For remove operation we return true only if we are removing s/t,
+ // i.e. cached value is not null.
+ ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+ if (onePhaseCommit())
+ txEntry.filtersPassed(pass);
+
+ boolean updateTtl = read;
+
+ if (pass) {
+ txEntry.markValid();
+
if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ log.debug("Filter passed in post lock for key: " + k);
}
- finally {
- if (entry != null && readCommitted()) {
- if (cacheCtx.isNear()) {
- if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
- if (entry.markObsolete(xidVer))
- cacheCtx.cache().removeEntry(entry);
- }
- }
- else
- entry.context().evicts().touch(entry, topVer);
+ else {
+ // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+ txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
+ txEntry.filters(CU.empty0());
+ txEntry.filtersSet(false);
+
+ updateTtl = !cacheCtx.putIfAbsentFilter(filter);
+ }
+
+ if (updateTtl) {
+ if (!read) {
+ ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
+
+ if (expiryPlc != null)
+ txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
}
+ else
+ txEntry.ttl(accessTtl);
}
+
+ break; // While.
+ }
+ // If entry cached within transaction got removed before lock.
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
+
+ txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index eb3524b,e0549fb..4726d86
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@@ -35,10 -35,10 +35,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
+import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
- import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+ import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@@ -160,13 -174,13 +160,13 @@@ public class IgniteCacheObjectProcessor
}
/** {@inheritDoc} */
- @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
+ @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException {
switch (type) {
case CacheObject.TYPE_BYTE_ARR:
- return new CacheObjectByteArrayImpl(bytes);
+ throw new IllegalArgumentException("Byte arrays cannot be used as cache keys.");
case CacheObject.TYPE_REGULAR:
- return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes);
- return new CacheObjectImpl(null, bytes);
++ return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes, -1);
}
throw new IllegalArgumentException("Invalid object type: " + type);
@@@ -299,8 -242,10 +299,8 @@@
@Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
assert ccfg != null;
- CacheMemoryMode memMode = ccfg.getMemoryMode();
-
boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) &&
- (GridQueryProcessor.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled()));
+ (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled()));
CacheObjectContext res = new CacheObjectContext(ctx,
ccfg.getName(),
@@@ -421,18 -362,10 +417,18 @@@
Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
- KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes);
- return new KeyCacheObjectImpl(val, valBytes, partition());
++ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+ key.partition(partition());
+
+ return key;
}
- KeyCacheObjectImpl key = new KeyCacheObjectImpl(val, valBytes);
- return new KeyCacheObjectImpl(val, valBytes, partition());
++ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+ key.partition(partition());
+
+ return key;
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to marshal object: " + val, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b,cfe94b2..075fb06
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@@ -121,11 -139,30 +139,30 @@@ public class ClusterProcessor extends G
}
}
+
+ /**
+ * @param vals collection to seek through.
+ */
+ private Boolean findLastFlag(Collection<Serializable> vals) {
+ Boolean flag = null;
+
+ for (Serializable ser : vals) {
+ if (ser != null) {
+ Map<String, Object> map = (Map<String, Object>) ser;
+
+ if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+ flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+ }
+ }
+
+ return flag;
+ }
+
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
if (notifyEnabled.get()) {
try {
- verChecker = new GridUpdateNotifier(ctx.gridName(),
+ verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(),
VER_STR,
ctx.gateway(),
ctx.plugins().allProviders(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 87d54a1,0000000..e5919e0
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@@ -1,947 -1,0 +1,944 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
- import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
- import org.apache.ignite.internal.processors.cache.ClusterState;
+import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
++import org.apache.ignite.internal.processors.cache.ClusterState;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
++import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
+
+/**
+ *
+ */
+public class GridClusterStateProcessor extends GridProcessorAdapter {
+ /** Global status. */
+ private volatile ClusterState globalState;
+
+ /** Action context. */
+ private volatile ChangeGlobalStateContext lastCgsCtx;
+
+ /** Local action future. */
+ private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
+
+ /** Process. */
+ @GridToStringExclude
+ private GridCacheProcessor cacheProc;
+
+ /** Shared context. */
+ @GridToStringExclude
+ private GridCacheSharedContext<?, ?> sharedCtx;
+
+ //todo may be add init latch
+
+ /** Listener. */
+ private final GridLocalEventListener lsr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt != null;
+
+ final DiscoveryEvent e = (DiscoveryEvent)evt;
+
+ assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
+
+ final GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (f != null)
+ f.initFut.listen(new CI1<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ f.onDiscoveryEvent(e);
+ }
+ });
+ }
+ };
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridClusterStateProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ super.start(activeOnStart);
+
+ globalState = activeOnStart ? ACTIVE : INACTIVE;
+ cacheProc = ctx.cache();
+ sharedCtx = cacheProc.context();
+
+ sharedCtx.io().addHandler(0,
+ GridChangeGlobalStateMessageResponse.class,
+ new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
+ @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+ processChangeGlobalStateResponse(nodeId, msg);
+ }
+ });
+
+ ctx.discovery().setCustomEventListener(
+ ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
+ assert topVer != null;
+ assert snd != null;
+ assert msg != null;
+
+ boolean activate = msg.activate();
+
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ if (actx != null && globalState == TRANSITION) {
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (log.isDebugEnabled())
+ log.debug("Concurrent " + prettyStr(activate) + " [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
+
+ if (f != null && f.requestId.equals(msg.requestId()))
+ f.onDone(new IgniteCheckedException(
+ "Concurrent change state, now in progress=" + (activate)
+ + ", initiatingNodeId=" + actx.initiatingNodeId
+ + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
+ ));
+
+ msg.concurrentChangeState();
+ }
+ else {
+ if (log.isInfoEnabled())
+ log.info("Create " + prettyStr(activate) + " context [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
+ msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
+
+ lastCgsCtx = new ChangeGlobalStateContext(
+ msg.requestId(),
+ msg.initiatorNodeId(),
+ msg.getDynamicCacheChangeBatch(),
+ msg.activate());
+
+ globalState = TRANSITION;
+ }
+ }
+ });
+
+ ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class);
+ ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
- "Node is stopping: " + ctx.gridName());
++ "Node is stopping: " + ctx.igniteInstanceName());
+
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (f != null)
+ f.onDone(stopErr);
+
+ cgsLocFut.set(null);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return DiscoveryDataExchangeType.STATE_PROC;
+ }
+
+ /** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- return globalState;
++ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
++ dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
+ }
+
+ /** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
- if (ctx.localNodeId().equals(joiningNodeId))
- globalState = (ClusterState)data;
++ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
++ globalState = (ClusterState)data.commonData();
+ }
+
+ /**
+ *
+ */
+ public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
+ if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
+ throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction.");
+
- if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
++ if ((globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
+ return new GridFinishedFuture<>();
+
+ final UUID requestId = UUID.randomUUID();
+
+ final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
+
+ if (!cgsLocFut.compareAndSet(null, cgsFut)) {
+ GridChangeGlobalStateFuture locF = cgsLocFut.get();
+
+ if (locF.activate == activate)
+ return locF;
+ else
+ return new GridFinishedFuture<>(new IgniteException(
+ "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate)));
+ }
+
+ try {
+ if (ctx.clientNode()) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+ IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers())
+ .compute().withAsync();
+
+ if (log.isInfoEnabled())
+ log.info("Send " + prettyStr(activate) + " request from client node [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + " ]");
+
+ comp.run(new ClientChangeGlobalStateComputeRequest(activate));
+
+ comp.future().listen(new CI1<IgniteFuture>() {
+ @Override public void apply(IgniteFuture fut) {
+ try {
+ fut.get();
+
+ cgsFut.onDone();
+ }
+ catch (Exception e) {
+ cgsFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
+
+ DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
+ requestId, null, ctx.localNodeId());
+
+ changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+
+ reqs.add(changeGlobalStateReq);
+
+ reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests());
+
+ ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
+ requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
+
+ try {
+ ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
+
+ if (ctx.isStopping())
+ cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
+ "node is stopping."));
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail create or send change global state request." + cgsFut, e);
+
+ cgsFut.onDone(e);
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail create or send change global state request." + cgsFut, e);
+
+ cgsFut.onDone(e);
+ }
+
+ return cgsFut;
+ }
+
+ /**
+ *
+ */
+ public boolean active() {
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ if (actx != null && !actx.activate && globalState == TRANSITION)
+ return true;
+
+ if (actx != null && actx.activate && globalState == TRANSITION)
+ return false;
+
+ return globalState == ACTIVE;
+ }
+
+ /**
+ * @param reqs Requests.
+ */
+ public boolean changeGlobalState(
+ Collection<DynamicCacheChangeRequest> reqs,
+ AffinityTopologyVersion topVer
+ ) {
+ assert !F.isEmpty(reqs);
+ assert topVer != null;
+
+ for (DynamicCacheChangeRequest req : reqs)
+ if (req.globalStateChange()) {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+
+ cgsCtx.topologyVersion(topVer);
+
+ return true;
+ }
+
+
+ return false;
+ }
+
+ /**
+ * Invoke from exchange future.
+ */
+ public Exception onChangeGlobalState() {
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null;
+
+ if (f != null)
+ f.setRemaining(cgsCtx.topVer);
+
+ return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
+ }
+
+ /**
+ * @param exs Exs.
+ */
+ public void onFullResponseMessage(Map<UUID, Exception> exs) {
+ assert !F.isEmpty(exs);
+
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ actx.setFail();
+
+ // revert change if activation request fail
+ if (actx.activate) {
+ try {
+ cacheProc.onKernalStopCaches(true);
+
+ cacheProc.stopCaches(true);
+
+ sharedCtx.affinity().removeAllCacheInfo();
+
+ if (!ctx.clientNode()) {
+ sharedCtx.database().onDeActivate(ctx);
+
+ if (sharedCtx.pageStore() != null)
+ sharedCtx.pageStore().onDeActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onDeActivate(ctx);
+ }
+ }
+ catch (Exception e) {
+ for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+ e.addSuppressed(entry.getValue());
+
+ log.error("Fail while revert activation request changes", e);
+ }
+ }
+ else {
+ //todo revert change if deactivate request fail
+ }
+
+ globalState = actx.activate ? INACTIVE : ACTIVE;
+
+ GridChangeGlobalStateFuture af = cgsLocFut.get();
+
+ if (af != null && af.requestId.equals(actx.requestId)) {
+ IgniteCheckedException e = new IgniteCheckedException("see suppressed");
+
+ for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+ e.addSuppressed(entry.getValue());
+
+ af.onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]");
+
+ Collection<CacheConfiguration> cfgs = new ArrayList<>();
+
+ for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) {
+ if (req.startCacheConfiguration() != null)
+ cfgs.add(req.startCacheConfiguration());
+ }
+
+ try {
+ if (!client) {
+ sharedCtx.database().lock();
+
+ IgnitePageStoreManager pageStore = sharedCtx.pageStore();
+
+ if (pageStore != null)
+ pageStore.onActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onActivate(ctx);
+
+ sharedCtx.database().initDataBase();
+
+ for (CacheConfiguration cfg : cfgs) {
+ if (CU.isSystemCache(cfg.getName()))
+ if (pageStore != null)
+ pageStore.initializeForCache(cfg);
+ }
+
+ for (CacheConfiguration cfg : cfgs) {
+ if (!CU.isSystemCache(cfg.getName()))
+ if (pageStore != null)
+ pageStore.initializeForCache(cfg);
+ }
+
+ sharedCtx.database().onActivate(ctx);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Success activate wal, dataBase, pageStore [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+
+ return null;
+ }
+ catch (Exception e) {
+ log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]", e);
+
+ if (!ctx.clientNode())
+ sharedCtx.database().unLock();
+
+ return e;
+ }
+ }
+
+ /**
+ *
+ */
+ public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" +
+ client + ", topVer=" + cgsCtx.topVer + "]");
+
+ try {
+ ctx.dataStructures().onDeActivate(ctx);
+
+ ctx.service().onDeActivate(ctx);
+
+ if (log.isInfoEnabled())
+ log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" +
+ client + ", topVer=" + cgsCtx.topVer + "]");
+
+ return null;
+ }
+ catch (Exception e) {
+ log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]", e);
+
+ return e;
+ }
+ finally {
+ if (!client)
+ sharedCtx.database().unLock();
+ }
+ }
+
+ /**
+ *
+ */
+ private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
+ IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ boolean client = ctx.clientNode();
+
+ Exception e = null;
+
+ try {
- ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
+ if (!ctx.config().isDaemon())
+ ctx.cacheObjects().onUtilityCacheStarted();
+
+ ctx.service().onUtilityCacheStarted();
+
+ ctx.service().onActivate(ctx);
+
+ ctx.dataStructures().onActivate(ctx);
+
+ if (log.isInfoEnabled())
+ log.info("Success final activate [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+ }
+ catch (Exception ex) {
+ e = ex;
+
+ log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex);
+ }
+ finally {
+ globalState = ACTIVE;
+
+ sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
+
+ GridClusterStateProcessor.this.lastCgsCtx = null;
+ }
+ }
+ });
+
+ cgsCtx.setAsyncActivateFut(asyncActivateFut);
+ }
+
+ /**
+ *
+ */
+ public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Success final deactivate [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+
+ Exception ex = null;
+
+ try {
+ if (!client) {
+ sharedCtx.database().onDeActivate(ctx);
+
+ if (sharedCtx.pageStore() != null)
+ sharedCtx.pageStore().onDeActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onDeActivate(ctx);
+
+ sharedCtx.affinity().removeAllCacheInfo();
+ }
+ }
+ catch (Exception e) {
+ ex = e;
+ }
+ finally {
+ globalState = INACTIVE;
+ }
+
+ sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
+
+ this.lastCgsCtx = null;
+ }
+
+ /**
+ *
+ */
+ public void onExchangeDone() {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null;
+
+ if (!cgsCtx.isFail()) {
+ if (cgsCtx.activate)
+ onFinalActivate(cgsCtx);
+ else
+ onFinalDeActivate(cgsCtx);
+ }
+ else
+ lastCgsCtx = null;
+ }
+
+ /**
+ * @param initNodeId Initialize node id.
+ * @param ex Exception.
+ */
+ private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
+ assert requestId != null;
+ assert initNodeId != null;
+
+ try {
+ GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
+
+ if (log.isDebugEnabled())
+ log.debug("Send change global state response [nodeId=" + ctx.localNodeId() +
+ ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
+
+ if (ctx.localNodeId().equals(initNodeId))
+ processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
+ else
+ sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail send change global state response to " + initNodeId, e);
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received activation response [requestId=" + msg.getRequestId() +
+ ", nodeId=" + nodeId + "]");
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
+ msg.getRequestId() + ']');
+
+ return;
+ }
+
+ UUID requestId = msg.getRequestId();
+
+ final GridChangeGlobalStateFuture fut = cgsLocFut.get();
+
+ if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
+ fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ fut.onResponse(nodeId, msg);
+ }
+ });
+ }
+ }
+
+
+
+ /**
+ * @param activate Activate.
+ */
+ private String prettyStr(boolean activate) {
+ return activate ? "activate" : "deactivate";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClusterStateProcessor.class, this);
+ }
+
+ /**
+ *
+ */
+ private static class GridChangeGlobalStateFuture extends GridFutureAdapter {
+ /** Request id. */
+ @GridToStringInclude
+ private final UUID requestId;
+
+ /** Activate. */
+ private final boolean activate;
+
+ /** Nodes. */
+ @GridToStringInclude
+ private final Set<UUID> remaining = new HashSet<>();
+
+ /** Responses. */
+ @GridToStringInclude
+ private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>();
+
+ /** Context. */
+ @GridToStringExclude
+ private final GridKernalContext ctx;
+
+ /** */
+ @GridToStringExclude
+ private final Object mux = new Object();
+
+ /** */
+ @GridToStringInclude
+ private final GridFutureAdapter initFut = new GridFutureAdapter();
+
+ /** Grid logger. */
+ @GridToStringExclude
+ private final IgniteLogger log;
+
+ /**
+ *
+ */
+ public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
+ this.requestId = requestId;
+ this.activate = activate;
+ this.ctx = ctx;
+ this.log = ctx.log(getClass());
+ }
+
+ /**
+ * @param event Event.
+ */
+ public void onDiscoveryEvent(DiscoveryEvent event) {
+ assert event != null;
+
+ if (isDone())
+ return;
+
+ boolean allReceived = false;
+
+ synchronized (mux) {
+ if (remaining.remove(event.eventNode().id()))
+ allReceived = remaining.isEmpty();
+ }
+
+ if (allReceived)
+ onAllReceived();
+ }
+
+ /**
+ *
+ */
+ public void setRemaining(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
+
+ List<UUID> ids = new ArrayList<>(nodes.size());
+
+ for (ClusterNode n : nodes)
+ ids.add(n.id());
+
+ if (log.isDebugEnabled())
+ log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
+ ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
+ ", nodes=" + Arrays.toString(ids.toArray()) + "]");
+
+ synchronized (mux) {
+ remaining.addAll(ids);
+ }
+
+ initFut.onDone();
+ }
+
+ /**
+ * @param msg Activation message response.
+ */
+ public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+ assert msg != null;
+
+ if (isDone())
+ return;
+
+ boolean allReceived = false;
+
+ synchronized (mux) {
+ if (remaining.remove(nodeId))
+ allReceived = remaining.isEmpty();
+
+ resps.put(nodeId, msg);
+ }
+
+ if (allReceived)
+ onAllReceived();
+ }
+
+ /**
+ *
+ */
+ private void onAllReceived() {
+ Throwable e = new Throwable();
+
+ boolean fail = false;
+
+ for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) {
+ GridChangeGlobalStateMessageResponse r = entry.getValue();
+
+ if (r.getError() != null) {
+ fail = true;
+
+ e.addSuppressed(r.getError());
+ }
+ }
+
+ if (fail)
+ onDone(e);
+ else
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ ctx.state().cgsLocFut.set(null);
+
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridChangeGlobalStateFuture.class, this);
+ }
+ }
+
+ /**
+ *
+ *
+ */
+ private static class ChangeGlobalStateContext {
+ /** Request id. */
+ private final UUID requestId;
+
+ /** Initiating node id. */
+ private final UUID initiatingNodeId;
+
+ /** Batch requests. */
+ private final DynamicCacheChangeBatch batch;
+
+ /** Activate. */
+ private final boolean activate;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Fail. */
+ private boolean fail;
+
+ /** Async activate future. */
+ private IgniteInternalFuture<?> asyncActivateFut;
+
+ /**
+ *
+ */
+ public ChangeGlobalStateContext(
+ UUID requestId,
+ UUID initiatingNodeId,
+ DynamicCacheChangeBatch batch,
+ boolean activate
+ ) {
+ this.requestId = requestId;
+ this.batch = batch;
+ this.activate = activate;
+ this.initiatingNodeId = initiatingNodeId;
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ *
+ */
+ private void setFail() {
+ fail = true;
+ }
+
+ /**
+ *
+ */
+ private boolean isFail() {
+ return fail;
+ }
+
+ /**
+ *
+ */
+ public IgniteInternalFuture<?> getAsyncActivateFut() {
+ return asyncActivateFut;
+ }
+
+ /**
+ * @param asyncActivateFut Async activate future.
+ */
+ public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
+ this.asyncActivateFut = asyncActivateFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ChangeGlobalStateContext.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Activation. */
+ private final boolean activation;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ *
+ */
+ private ClientChangeGlobalStateComputeRequest(boolean activation) {
+ this.activation = activation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.active(activation);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 5e15b46,3178e92..57ae7c6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@@ -52,10 -39,18 +49,8 @@@ import org.apache.ignite.lang.IgniteClo
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
- import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
- import static org.apache.ignite.igfs.IgfsMode.PROXY;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 0000000,66c19a0..6d59f89
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@@ -1,0 -1,363 +1,363 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.marshaller;
+
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.events.DiscoveryEvent;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.GridKernalContext;
+ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+ import org.apache.ignite.internal.MarshallerContextImpl;
+ import org.apache.ignite.internal.managers.communication.GridIoManager;
+ import org.apache.ignite.internal.managers.communication.GridMessageListener;
+ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.processors.GridProcessorAdapter;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+ import org.apache.ignite.internal.util.future.GridFutureAdapter;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteFuture;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+ import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentHashMap8;
+
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+ import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+ /**
+ * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+ * events for exchanging marshalling mappings between nodes in grid.
+ *
+ * In particular it processes two flows:
+ * <ul>
+ * <li>
+ * Some node, server or client, wants to add new mapping for some class.
+ * In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+ * </li>
+ * <li>
+ * As discovery events are delivered to clients asynchronously,
+ * client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+ * In that situation client sends a {@link MissingMappingRequestMessage} request
+ * and processor handles it as well as {@link MissingMappingResponseMessage} message.
+ * </li>
+ * </ul>
+ */
+ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+ /** */
+ private final MarshallerContextImpl marshallerCtx;
+
+ /** */
+ private final GridClosureProcessor closProc;
+
+ /** */
+ private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>();
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+ = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ marshallerCtx = ctx.marshallerContext();
+
+ closProc = ctx.closure();
+ }
+
+ /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
++ @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ GridDiscoveryManager discoMgr = ctx.discovery();
+ GridIoManager ioMgr = ctx.io();
+
+ MarshallerMappingTransport transport = new MarshallerMappingTransport(
+ ctx,
+ mappingExchangeSyncMap,
+ clientReqSyncMap
+ );
++
+ marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+
+ discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+
+ discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+
+ if (ctx.clientNode())
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+ else
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+
+ if (ctx.clientNode())
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+ if (!ctx.isStopping()) {
+ for (ClientRequestFuture fut : clientReqSyncMap.values())
+ fut.onNodeLeft(evt0.eventNode().id());
+ }
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /**
+ * Adds a listener to be notified when mapping changes.
+ *
+ * @param mappingUpdatedListener listener for mapping updated events.
+ */
+ public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
+ mappingUpdatedLsnrs.add(mappingUpdatedListener);
+ }
+
+ /**
+ * Gets an iterator over all current mappings.
+ *
+ * @return Iterator over current mappings.
+ */
+ public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() {
+ return marshallerCtx.currentMappings();
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingRequestListener implements GridMessageListener {
+ /** */
+ private final GridIoManager ioMgr;
+
+ /**
+ * @param ioMgr Io manager.
+ */
+ MissingMappingRequestListener(GridIoManager ioMgr) {
+ this.ioMgr = ioMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingRequestMessage : msg;
+
+ MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+
+ String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+
+ try {
+ ioMgr.sendToGridTopic(
+ nodeId,
+ TOPIC_MAPPING_MARSH,
+ new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send missing mapping response.", e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingResponseListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingResponseMessage : msg;
+
+ MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+ String resolvedClsName = msg0.className();
+
+ MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+
+ GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+
+ if (fut != null) {
+ if (resolvedClsName != null) {
+ marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+ }
+ else
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ new IgniteCheckedException(
+ "Failed to resolve mapping [platformId: "
+ + platformId
+ + ", typeId: "
+ + typeId + "]")));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingProposedMessage msg
+ ) {
+ if (!ctx.isStopping()) {
+ if (msg.duplicated())
+ return;
+
+ if (!msg.inConflict()) {
+ MarshallerMappingItem item = msg.mappingItem();
+ String conflictingName = marshallerCtx.onMappingProposed(item);
+
+ if (conflictingName != null) {
+ if (conflictingName.equals(item.className()))
+ msg.markDuplicated();
+ else
+ msg.conflictingWithClass(conflictingName);
+ }
+ }
+ else {
+ UUID origNodeId = msg.origNodeId();
+
+ if (origNodeId.equals(ctx.localNodeId())) {
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+
+ assert fut != null: msg;
+
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param mappingItem Mapping item.
+ * @param conflictingClsName Conflicting class name.
+ */
+ private IgniteCheckedException duplicateMappingException(
+ MarshallerMappingItem mappingItem,
+ String conflictingClsName
+ ) {
+ return new IgniteCheckedException("Duplicate ID [platformId="
+ + mappingItem.platformId()
+ + ", typeId="
+ + mappingItem.typeId()
+ + ", oldCls="
+ + conflictingClsName
+ + ", newCls="
+ + mappingItem.className() + "]");
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingAcceptedMessage msg
+ ) {
+ final MarshallerMappingItem item = msg.getMappingItem();
+ marshallerCtx.onMappingAccepted(item);
+
+ closProc.runLocalSafe(new Runnable() {
+ @Override public void run() {
+ for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
+ lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
+ }
+ });
+
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+
+ if (fut != null)
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+ dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+
+ if (mappings != null) {
+ for (int i = 0; i < mappings.size(); i++) {
+ Map<Integer, MappedName> map;
+
+ if ((map = mappings.get(i)) != null)
+ marshallerCtx.onMappingDataReceived((byte) i, map);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+ ctx.cluster().clientReconnectFuture(),
+ "Failed to propose or request mapping, client node disconnected.")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ marshallerCtx.onMarshallerProcessorStop();
+
+ cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return MARSHALLER_PROC;
+ }
+
+ /**
+ * @param res Response.
+ */
+ private void cancelFutures(MappingExchangeResult res) {
+ for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+ fut.onDone(res);
+
+ for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+ fut.onDone(res);
+ }
+ }