You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/15 10:45:12 UTC
[47/50] [abbrv] ignite git commit: Megre 2.0
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f05fdf7,cd4c55c..0309fa7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -403,7 -400,7 +404,8 @@@ public abstract class IgniteTxLocalAdap
boolean skipVals,
boolean needVer,
boolean keepBinary,
+ boolean recovery,
+ final ExpiryPolicy expiryPlc,
final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
assert cacheCtx.isLocal() : cacheCtx.name();
@@@ -431,9 -430,11 +435,9 @@@
continue;
try {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
this,
- /*readSwap*/true,
- /*unmarshal*/true,
/*update-metrics*/!skipVals,
/*event*/!skipVals,
CU.subjectId(this, cctx),
@@@ -1246,9 -1222,11 +1254,9 @@@
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+ EntryGetResult res = txEntry.cached().innerGetVersioned(
null,
this,
- /*swap*/true,
- /*unmarshal*/true,
/*update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(this, cctx),
@@@ -1499,7 -1486,7 +1511,8 @@@
skipVals,
needReadVer,
!deserializeBinary,
+ recovery,
+ expiryPlc,
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
if (isRollbackOnly()) {
@@@ -1675,10 -1665,12 +1692,10 @@@
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+ EntryGetResult res = cached.innerGetVersioned(
null,
IgniteTxLocalAdapter.this,
- /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
- /*update-metrics*/true,
+ /**update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(IgniteTxLocalAdapter.this, cctx),
transformClo,
@@@ -2041,7 -2035,7 +2061,8 @@@
/*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary,
- expiryPlc);
++ expiryPlc,
+ recovery);
}
return new GridFinishedFuture<>();
@@@ -2212,7 -2205,7 +2233,8 @@@
/*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary,
- expiryPlc);
++ expiryPlc,
+ recovery);
}
return new GridFinishedFuture<>();
@@@ -2232,8 -2225,7 +2254,9 @@@
* @param hasFilters {@code True} if filters not empty.
* @param readThrough Read through flag.
* @param retval Return value flag.
+ * @param keepBinary Keep binary flag.
+ * @param expiryPlc Expiry policy.
+ * @param recovery Recovery flag.
* @return Load future.
*/
private IgniteInternalFuture<Void> loadMissing(
@@@ -2248,8 -2240,7 +2271,9 @@@
final boolean readThrough,
final boolean retval,
final boolean keepBinary,
- final ExpiryPolicy expiryPlc) {
++ final ExpiryPolicy expiryPlc,
+ final boolean recovery
+ ) {
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key,
@@@ -2323,7 -2314,7 +2347,8 @@@
/*skipVals*/singleRmv,
needReadVer,
keepBinary,
+ recovery,
+ expiryPlc,
c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index d6b09e4,f5687a0..cf1e7e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -192,6 -194,6 +194,7 @@@ public interface IgniteTxLocalEx extend
boolean skipVals,
boolean needVer,
boolean keepBinary,
+ boolean recovery,
+ final ExpiryPolicy expiryPlc,
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b,d1c8b2d..a32b9d8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@@ -121,8 -139,27 +139,27 @@@ public class ClusterProcessor extends G
}
}
+
+ /**
+ * @param vals collection to seek through.
+ */
+ private Boolean findLastFlag(Collection<Serializable> vals) {
+ Boolean flag = null;
+
+ for (Serializable ser : vals) {
+ if (ser != null) {
+ Map<String, Object> map = (Map<String, Object>) ser;
+
+ if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+ flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+ }
+ }
+
+ return flag;
+ }
+
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
if (notifyEnabled.get()) {
try {
verChecker = new GridUpdateNotifier(ctx.gridName(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 87d54a1,0000000..247d469
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@@ -1,947 -1,0 +1,955 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.ClusterState;
+import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
++import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
+
+/**
+ *
+ */
+public class GridClusterStateProcessor extends GridProcessorAdapter {
+ /** Global status. */
+ private volatile ClusterState globalState;
+
+ /** Action context. */
+ private volatile ChangeGlobalStateContext lastCgsCtx;
+
+ /** Local action future. */
+ private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
+
+ /** Process. */
+ @GridToStringExclude
+ private GridCacheProcessor cacheProc;
+
+ /** Shared context. */
+ @GridToStringExclude
+ private GridCacheSharedContext<?, ?> sharedCtx;
+
+ //todo may be add init latch
+
+ /** Listener. */
+ private final GridLocalEventListener lsr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt != null;
+
+ final DiscoveryEvent e = (DiscoveryEvent)evt;
+
+ assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
+
+ final GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (f != null)
+ f.initFut.listen(new CI1<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ f.onDiscoveryEvent(e);
+ }
+ });
+ }
+ };
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridClusterStateProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ super.start(activeOnStart);
+
+ globalState = activeOnStart ? ACTIVE : INACTIVE;
+ cacheProc = ctx.cache();
+ sharedCtx = cacheProc.context();
+
+ sharedCtx.io().addHandler(0,
+ GridChangeGlobalStateMessageResponse.class,
+ new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
+ @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+ processChangeGlobalStateResponse(nodeId, msg);
+ }
+ });
+
+ ctx.discovery().setCustomEventListener(
+ ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
+ assert topVer != null;
+ assert snd != null;
+ assert msg != null;
+
+ boolean activate = msg.activate();
+
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ if (actx != null && globalState == TRANSITION) {
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (log.isDebugEnabled())
+ log.debug("Concurrent " + prettyStr(activate) + " [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
+
+ if (f != null && f.requestId.equals(msg.requestId()))
+ f.onDone(new IgniteCheckedException(
+ "Concurrent change state, now in progress=" + (activate)
+ + ", initiatingNodeId=" + actx.initiatingNodeId
+ + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
+ ));
+
+ msg.concurrentChangeState();
+ }
+ else {
+ if (log.isInfoEnabled())
+ log.info("Create " + prettyStr(activate) + " context [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
+ msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
+
+ lastCgsCtx = new ChangeGlobalStateContext(
+ msg.requestId(),
+ msg.initiatorNodeId(),
+ msg.getDynamicCacheChangeBatch(),
+ msg.activate());
+
+ globalState = TRANSITION;
+ }
+ }
+ });
+
+ ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class);
+ ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
+ "Node is stopping: " + ctx.gridName());
+
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ if (f != null)
+ f.onDone(stopErr);
+
+ cgsLocFut.set(null);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return DiscoveryDataExchangeType.STATE_PROC;
+ }
+
+ /** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- return globalState;
++ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
++ dataBag.addJoiningNodeData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
+ }
+
+ /** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
- if (ctx.localNodeId().equals(joiningNodeId))
- globalState = (ClusterState)data;
++ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
++ dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState);
++ }
++
++ /** {@inheritDoc} */
++ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
++ globalState = (ClusterState)data.commonData();
++ }
++
++ /** {@inheritDoc} */
++ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
++ // No-op.
+ }
+
+ /**
+ *
+ */
+ public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
+ if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
+ throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction.");
+
+ if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate))
+ return new GridFinishedFuture<>();
+
+ final UUID requestId = UUID.randomUUID();
+
+ final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
+
+ if (!cgsLocFut.compareAndSet(null, cgsFut)) {
+ GridChangeGlobalStateFuture locF = cgsLocFut.get();
+
+ if (locF.activate == activate)
+ return locF;
+ else
+ return new GridFinishedFuture<>(new IgniteException(
+ "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate)));
+ }
+
+ try {
+ if (ctx.clientNode()) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+ IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers())
+ .compute().withAsync();
+
+ if (log.isInfoEnabled())
+ log.info("Send " + prettyStr(activate) + " request from client node [id=" +
+ ctx.localNodeId() + " topVer=" + topVer + " ]");
+
+ comp.run(new ClientChangeGlobalStateComputeRequest(activate));
+
+ comp.future().listen(new CI1<IgniteFuture>() {
+ @Override public void apply(IgniteFuture fut) {
+ try {
+ fut.get();
+
+ cgsFut.onDone();
+ }
+ catch (Exception e) {
+ cgsFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
+
+ DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
+ requestId, null, ctx.localNodeId());
+
+ changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+
+ reqs.add(changeGlobalStateReq);
+
+ reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests());
+
+ ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
+ requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
+
+ try {
+ ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
+
+ if (ctx.isStopping())
+ cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
+ "node is stopping."));
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail create or send change global state request." + cgsFut, e);
+
+ cgsFut.onDone(e);
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail create or send change global state request." + cgsFut, e);
+
+ cgsFut.onDone(e);
+ }
+
+ return cgsFut;
+ }
+
+ /**
+ *
+ */
+ public boolean active() {
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ if (actx != null && !actx.activate && globalState == TRANSITION)
+ return true;
+
+ if (actx != null && actx.activate && globalState == TRANSITION)
+ return false;
+
+ return globalState == ACTIVE;
+ }
+
+ /**
+ * @param reqs Requests.
+ */
+ public boolean changeGlobalState(
+ Collection<DynamicCacheChangeRequest> reqs,
+ AffinityTopologyVersion topVer
+ ) {
+ assert !F.isEmpty(reqs);
+ assert topVer != null;
+
+ for (DynamicCacheChangeRequest req : reqs)
+ if (req.globalStateChange()) {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+
+ cgsCtx.topologyVersion(topVer);
+
+ return true;
+ }
+
+
+ return false;
+ }
+
+ /**
+ * Invoke from exchange future.
+ */
+ public Exception onChangeGlobalState() {
+ GridChangeGlobalStateFuture f = cgsLocFut.get();
+
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null;
+
+ if (f != null)
+ f.setRemaining(cgsCtx.topVer);
+
+ return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
+ }
+
+ /**
+ * @param exs Exs.
+ */
+ public void onFullResponseMessage(Map<UUID, Exception> exs) {
+ assert !F.isEmpty(exs);
+
+ ChangeGlobalStateContext actx = lastCgsCtx;
+
+ actx.setFail();
+
+ // revert change if activation request fail
+ if (actx.activate) {
+ try {
+ cacheProc.onKernalStopCaches(true);
+
+ cacheProc.stopCaches(true);
+
+ sharedCtx.affinity().removeAllCacheInfo();
+
+ if (!ctx.clientNode()) {
+ sharedCtx.database().onDeActivate(ctx);
+
+ if (sharedCtx.pageStore() != null)
+ sharedCtx.pageStore().onDeActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onDeActivate(ctx);
+ }
+ }
+ catch (Exception e) {
+ for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+ e.addSuppressed(entry.getValue());
+
+ log.error("Fail while revert activation request changes", e);
+ }
+ }
+ else {
+ //todo revert change if deactivate request fail
+ }
+
+ globalState = actx.activate ? INACTIVE : ACTIVE;
+
+ GridChangeGlobalStateFuture af = cgsLocFut.get();
+
+ if (af != null && af.requestId.equals(actx.requestId)) {
+ IgniteCheckedException e = new IgniteCheckedException("see suppressed");
+
+ for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+ e.addSuppressed(entry.getValue());
+
+ af.onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]");
+
+ Collection<CacheConfiguration> cfgs = new ArrayList<>();
+
+ for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) {
+ if (req.startCacheConfiguration() != null)
+ cfgs.add(req.startCacheConfiguration());
+ }
+
+ try {
+ if (!client) {
+ sharedCtx.database().lock();
+
+ IgnitePageStoreManager pageStore = sharedCtx.pageStore();
+
+ if (pageStore != null)
+ pageStore.onActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onActivate(ctx);
+
+ sharedCtx.database().initDataBase();
+
+ for (CacheConfiguration cfg : cfgs) {
+ if (CU.isSystemCache(cfg.getName()))
+ if (pageStore != null)
+ pageStore.initializeForCache(cfg);
+ }
+
+ for (CacheConfiguration cfg : cfgs) {
+ if (!CU.isSystemCache(cfg.getName()))
+ if (pageStore != null)
+ pageStore.initializeForCache(cfg);
+ }
+
+ sharedCtx.database().onActivate(ctx);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Success activate wal, dataBase, pageStore [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+
+ return null;
+ }
+ catch (Exception e) {
+ log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]", e);
+
+ if (!ctx.clientNode())
+ sharedCtx.database().unLock();
+
+ return e;
+ }
+ }
+
+ /**
+ *
+ */
+ public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" +
+ client + ", topVer=" + cgsCtx.topVer + "]");
+
+ try {
+ ctx.dataStructures().onDeActivate(ctx);
+
+ ctx.service().onDeActivate(ctx);
+
+ if (log.isInfoEnabled())
+ log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" +
+ client + ", topVer=" + cgsCtx.topVer + "]");
+
+ return null;
+ }
+ catch (Exception e) {
+ log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + cgsCtx.topVer + "]", e);
+
+ return e;
+ }
+ finally {
+ if (!client)
+ sharedCtx.database().unLock();
+ }
+ }
+
+ /**
+ *
+ */
+ private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
+ IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ boolean client = ctx.clientNode();
+
+ Exception e = null;
+
+ try {
- ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
+ if (!ctx.config().isDaemon())
+ ctx.cacheObjects().onUtilityCacheStarted();
+
+ ctx.service().onUtilityCacheStarted();
+
+ ctx.service().onActivate(ctx);
+
+ ctx.dataStructures().onActivate(ctx);
+
+ if (log.isInfoEnabled())
+ log.info("Success final activate [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+ }
+ catch (Exception ex) {
+ e = ex;
+
+ log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client +
+ ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex);
+ }
+ finally {
+ globalState = ACTIVE;
+
+ sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
+
+ GridClusterStateProcessor.this.lastCgsCtx = null;
+ }
+ }
+ });
+
+ cgsCtx.setAsyncActivateFut(asyncActivateFut);
+ }
+
+ /**
+ *
+ */
+ public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
+ final boolean client = ctx.clientNode();
+
+ if (log.isInfoEnabled())
+ log.info("Success final deactivate [nodeId="
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+
+ Exception ex = null;
+
+ try {
+ if (!client) {
+ sharedCtx.database().onDeActivate(ctx);
+
+ if (sharedCtx.pageStore() != null)
+ sharedCtx.pageStore().onDeActivate(ctx);
+
+ if (sharedCtx.wal() != null)
+ sharedCtx.wal().onDeActivate(ctx);
+
+ sharedCtx.affinity().removeAllCacheInfo();
+ }
+ }
+ catch (Exception e) {
+ ex = e;
+ }
+ finally {
+ globalState = INACTIVE;
+ }
+
+ sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
+
+ this.lastCgsCtx = null;
+ }
+
+ /**
+ *
+ */
+ public void onExchangeDone() {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+
+ assert cgsCtx != null;
+
+ if (!cgsCtx.isFail()) {
+ if (cgsCtx.activate)
+ onFinalActivate(cgsCtx);
+ else
+ onFinalDeActivate(cgsCtx);
+ }
+ else
+ lastCgsCtx = null;
+ }
+
+ /**
+ * @param initNodeId Initialize node id.
+ * @param ex Exception.
+ */
+ private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
+ assert requestId != null;
+ assert initNodeId != null;
+
+ try {
+ GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
+
+ if (log.isDebugEnabled())
+ log.debug("Send change global state response [nodeId=" + ctx.localNodeId() +
+ ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
+
+ if (ctx.localNodeId().equals(initNodeId))
+ processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
+ else
+ sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Fail send change global state response to " + initNodeId, e);
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received activation response [requestId=" + msg.getRequestId() +
+ ", nodeId=" + nodeId + "]");
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
+ msg.getRequestId() + ']');
+
+ return;
+ }
+
+ UUID requestId = msg.getRequestId();
+
+ final GridChangeGlobalStateFuture fut = cgsLocFut.get();
+
+ if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
+ fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ fut.onResponse(nodeId, msg);
+ }
+ });
+ }
+ }
+
+
+
+ /**
+ * @param activate Activate.
+ */
+ private String prettyStr(boolean activate) {
+ return activate ? "activate" : "deactivate";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClusterStateProcessor.class, this);
+ }
+
+ /**
+ *
+ */
+ private static class GridChangeGlobalStateFuture extends GridFutureAdapter {
+ /** Request id. */
+ @GridToStringInclude
+ private final UUID requestId;
+
+ /** Activate. */
+ private final boolean activate;
+
+ /** Nodes. */
+ @GridToStringInclude
+ private final Set<UUID> remaining = new HashSet<>();
+
+ /** Responses. */
+ @GridToStringInclude
+ private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>();
+
+ /** Context. */
+ @GridToStringExclude
+ private final GridKernalContext ctx;
+
+ /** */
+ @GridToStringExclude
+ private final Object mux = new Object();
+
+ /** */
+ @GridToStringInclude
+ private final GridFutureAdapter initFut = new GridFutureAdapter();
+
+ /** Grid logger. */
+ @GridToStringExclude
+ private final IgniteLogger log;
+
+ /**
+ *
+ */
+ public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
+ this.requestId = requestId;
+ this.activate = activate;
+ this.ctx = ctx;
+ this.log = ctx.log(getClass());
+ }
+
+ /**
+ * @param event Event.
+ */
+ public void onDiscoveryEvent(DiscoveryEvent event) {
+ assert event != null;
+
+ if (isDone())
+ return;
+
+ boolean allReceived = false;
+
+ synchronized (mux) {
+ if (remaining.remove(event.eventNode().id()))
+ allReceived = remaining.isEmpty();
+ }
+
+ if (allReceived)
+ onAllReceived();
+ }
+
+ /**
+ *
+ */
+ public void setRemaining(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
+
+ List<UUID> ids = new ArrayList<>(nodes.size());
+
+ for (ClusterNode n : nodes)
+ ids.add(n.id());
+
+ if (log.isDebugEnabled())
+ log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
+ ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
+ ", nodes=" + Arrays.toString(ids.toArray()) + "]");
+
+ synchronized (mux) {
+ remaining.addAll(ids);
+ }
+
+ initFut.onDone();
+ }
+
+ /**
+ * @param msg Activation message response.
+ */
+ public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+ assert msg != null;
+
+ if (isDone())
+ return;
+
+ boolean allReceived = false;
+
+ synchronized (mux) {
+ if (remaining.remove(nodeId))
+ allReceived = remaining.isEmpty();
+
+ resps.put(nodeId, msg);
+ }
+
+ if (allReceived)
+ onAllReceived();
+ }
+
+ /**
+ *
+ */
+ private void onAllReceived() {
+ Throwable e = new Throwable();
+
+ boolean fail = false;
+
+ for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) {
+ GridChangeGlobalStateMessageResponse r = entry.getValue();
+
+ if (r.getError() != null) {
+ fail = true;
+
+ e.addSuppressed(r.getError());
+ }
+ }
+
+ if (fail)
+ onDone(e);
+ else
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ ctx.state().cgsLocFut.set(null);
+
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridChangeGlobalStateFuture.class, this);
+ }
+ }
+
+ /**
+ *
+ *
+ */
+ private static class ChangeGlobalStateContext {
+ /** Request id. */
+ private final UUID requestId;
+
+ /** Initiating node id. */
+ private final UUID initiatingNodeId;
+
+ /** Batch requests. */
+ private final DynamicCacheChangeBatch batch;
+
+ /** Activate. */
+ private final boolean activate;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Fail. */
+ private boolean fail;
+
+ /** Async activate future. */
+ private IgniteInternalFuture<?> asyncActivateFut;
+
+ /**
+ *
+ */
+ public ChangeGlobalStateContext(
+ UUID requestId,
+ UUID initiatingNodeId,
+ DynamicCacheChangeBatch batch,
+ boolean activate
+ ) {
+ this.requestId = requestId;
+ this.batch = batch;
+ this.activate = activate;
+ this.initiatingNodeId = initiatingNodeId;
+ }
+
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ *
+ */
+ private void setFail() {
+ fail = true;
+ }
+
+ /**
+ *
+ */
+ private boolean isFail() {
+ return fail;
+ }
+
+ /**
+ *
+ */
+ public IgniteInternalFuture<?> getAsyncActivateFut() {
+ return asyncActivateFut;
+ }
+
+ /**
+ * @param asyncActivateFut Async activate future.
+ */
+ public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
+ this.asyncActivateFut = asyncActivateFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ChangeGlobalStateContext.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Activation. */
+ private final boolean activation;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ *
+ */
+ private ClientChangeGlobalStateComputeRequest(boolean activation) {
+ this.activation = activation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.active(activation);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9012214,f97fc14..df2f7d9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1925,59 -1920,24 +1923,59 @@@ public class DataStreamerImpl<K, V> imp
ExpiryPolicy plc = cctx.expiry();
- for (Entry<KeyCacheObject, CacheObject> e : entries) {
- try {
- e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+ Collection<Integer> reservedParts = new HashSet<>();
+ Collection<Integer> ignoredParts = new HashSet<>();
- GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+ try {
+ for (Entry<KeyCacheObject, CacheObject> e : entries) {
+ cctx.shared().database().checkpointReadLock();
- if (plc != null) {
- ttl = CU.toTtl(plc.getExpiryForCreation());
+ try {
+ e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
- if (ttl == CU.TTL_ZERO)
- continue;
- else if (ttl == CU.TTL_NOT_CHANGED)
- ttl = 0;
+ if (!cctx.isLocal()) {
+ int p = cctx.affinity().partition(e.getKey());
- expiryTime = CU.toExpireTime(ttl);
- }
+ if (ignoredParts.contains(p))
+ continue;
+
+ if (!reservedParts.contains(p)) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, true);
+
+ if (!part.reserve()) {
+ ignoredParts.add(p);
+
+ continue;
+ }
+ else {
+ // We must not allow to read from RENTING partitions.
+ if (part.state() == GridDhtPartitionState.RENTING) {
+ part.release();
+
+ ignoredParts.add(p);
+
+ continue;
+ }
+
+ reservedParts.add(p);
+ }
+ }
+ }
+
+ GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+
+ if (plc != null) {
+ ttl = CU.toTtl(plc.getExpiryForCreation());
+
+ if (ttl == CU.TTL_ZERO)
+ continue;
+ else if (ttl == CU.TTL_NOT_CHANGED)
+ ttl = 0;
+
+ expiryTime = CU.toExpireTime(ttl);
+ }
- boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer);
+ boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer);
entry.initialValue(e.getValue(),
ver,
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 0000000,fdea869..3d2f86e
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@@ -1,0 -1,363 +1,363 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.marshaller;
+
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutorService;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.events.DiscoveryEvent;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.GridKernalContext;
+ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+ import org.apache.ignite.internal.MarshallerContextImpl;
+ import org.apache.ignite.internal.managers.communication.GridIoManager;
+ import org.apache.ignite.internal.managers.communication.GridMessageListener;
+ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.processors.GridProcessorAdapter;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+ import org.apache.ignite.internal.util.future.GridFutureAdapter;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteFuture;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+ import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentHashMap8;
+
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+ import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+ /**
+ * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+ * events for exchanging marshalling mappings between nodes in grid.
+ *
+ * In particular it processes two flows:
+ * <ul>
+ * <li>
+ * Some node, server or client, wants to add new mapping for some class.
+ * In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+ * </li>
+ * <li>
+ * As discovery events are delivered to clients asynchronously,
+ * client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+ * In that situation client sends a {@link MissingMappingRequestMessage} request
+ * and processor handles it as well as {@link MissingMappingResponseMessage} message.
+ * </li>
+ * </ul>
+ */
+ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+ /** */
+ private final MarshallerContextImpl marshallerCtx;
+
+ /** */
+ private final GridClosureProcessor closProc;
+
+ /** */
+ private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>();
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+ = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ marshallerCtx = ctx.marshallerContext();
+
+ closProc = ctx.closure();
+ }
+
+ /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
++ @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ GridDiscoveryManager discoMgr = ctx.discovery();
+ GridIoManager ioMgr = ctx.io();
+
+ MarshallerMappingTransport transport = new MarshallerMappingTransport(
+ ctx,
+ mappingExchangeSyncMap,
+ clientReqSyncMap
+ );
+ marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+
+ discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+
+ discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+
+ if (ctx.clientNode())
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+ else
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+
+ if (ctx.clientNode())
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+ if (!ctx.isStopping()) {
+ for (ClientRequestFuture fut : clientReqSyncMap.values())
+ fut.onNodeLeft(evt0.eventNode().id());
+ }
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /**
+ * Adds a listener to be notified when mapping changes.
+ *
+ * @param mappingUpdatedListener listener for mapping updated events.
+ */
+ public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
+ mappingUpdatedLsnrs.add(mappingUpdatedListener);
+ }
+
+ /**
+ * Gets an iterator over all current mappings.
+ *
+ * @return Iterator over current mappings.
+ */
+ public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() {
+ return marshallerCtx.currentMappings();
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingRequestListener implements GridMessageListener {
+ /** */
+ private final GridIoManager ioMgr;
+
+ /**
+ * @param ioMgr Io manager.
+ */
+ MissingMappingRequestListener(GridIoManager ioMgr) {
+ this.ioMgr = ioMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingRequestMessage : msg;
+
+ MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+
+ String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+
+ try {
+ ioMgr.send(
+ nodeId,
+ TOPIC_MAPPING_MARSH,
+ new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send missing mapping response.", e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingResponseListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingResponseMessage : msg;
+
+ MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+ String resolvedClsName = msg0.className();
+
+ MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+
+ GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+
+ if (fut != null) {
+ if (resolvedClsName != null) {
+ marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+ }
+ else
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ new IgniteCheckedException(
+ "Failed to resolve mapping [platformId: "
+ + platformId
+ + ", typeId: "
+ + typeId + "]")));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingProposedMessage msg
+ ) {
+ if (!ctx.isStopping()) {
+ if (msg.duplicated())
+ return;
+
+ if (!msg.inConflict()) {
+ MarshallerMappingItem item = msg.mappingItem();
+ String conflictingName = marshallerCtx.onMappingProposed(item);
+
+ if (conflictingName != null) {
+ if (conflictingName.equals(item.className()))
+ msg.markDuplicated();
+ else
+ msg.conflictingWithClass(conflictingName);
+ }
+ }
+ else {
+ UUID origNodeId = msg.origNodeId();
+
+ if (origNodeId.equals(ctx.localNodeId())) {
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+
+ assert fut != null: msg;
+
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param mappingItem Mapping item.
+ * @param conflictingClsName Conflicting class name.
+ */
+ private IgniteCheckedException duplicateMappingException(
+ MarshallerMappingItem mappingItem,
+ String conflictingClsName
+ ) {
+ return new IgniteCheckedException("Duplicate ID [platformId="
+ + mappingItem.platformId()
+ + ", typeId="
+ + mappingItem.typeId()
+ + ", oldCls="
+ + conflictingClsName
+ + ", newCls="
+ + mappingItem.className() + "]");
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingAcceptedMessage msg
+ ) {
+ final MarshallerMappingItem item = msg.getMappingItem();
+ marshallerCtx.onMappingAccepted(item);
+
+ closProc.runLocalSafe(new Runnable() {
+ @Override public void run() {
+ for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
+ lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
+ }
+ });
+
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+
+ if (fut != null)
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+ dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+
+ if (mappings != null) {
+ for (int i = 0; i < mappings.size(); i++) {
+ Map<Integer, MappedName> map;
+
+ if ((map = mappings.get(i)) != null)
+ marshallerCtx.onMappingDataReceived((byte) i, map);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+ ctx.cluster().clientReconnectFuture(),
+ "Failed to propose or request mapping, client node disconnected.")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ marshallerCtx.onMarshallerProcessorStop();
+
+ cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return MARSHALLER_PROC;
+ }
+
+ /**
+ * @param res Response.
+ */
+ private void cancelFutures(MappingExchangeResult res) {
+ for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+ fut.onDone(res);
+
+ for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+ fut.onDone(res);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 5fcd2eb,ea8f361..efd1926
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@@ -55,15 -44,12 +55,17 @@@ import org.apache.ignite.configuration.
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
-import org.apache.ignite.internal.binary.*;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
+ import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration;
-import org.apache.ignite.platform.dotnet.*;
+import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction;
+import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration;
+import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
+import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
+import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
+ import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean;
@@@ -821,19 -865,19 +850,19 @@@ public class PlatformConfigurationUtil
* Write query entity.
*
* @param writer Writer.
-- * @param queryEntity Query entity.
++ * @param qryEntity Query entity.
*/
-- private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity queryEntity) {
-- assert queryEntity != null;
++ private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity) {
++ assert qryEntity != null;
-- writer.writeString(queryEntity.getKeyType());
-- writer.writeString(queryEntity.getValueType());
++ writer.writeString(qryEntity.getKeyType());
++ writer.writeString(qryEntity.getValueType());
// Fields
-- LinkedHashMap<String, String> fields = queryEntity.getFields();
++ LinkedHashMap<String, String> fields = qryEntity.getFields();
if (fields != null) {
-- Set<String> keyFields = queryEntity.getKeyFields();
++ Set<String> keyFields = qryEntity.getKeyFields();
writer.writeInt(fields.size());
@@@ -847,7 -891,7 +876,7 @@@
writer.writeInt(0);
// Aliases
-- Map<String, String> aliases = queryEntity.getAliases();
++ Map<String, String> aliases = qryEntity.getAliases();
if (aliases != null) {
writer.writeInt(aliases.size());
@@@ -861,7 -905,7 +890,7 @@@
writer.writeInt(0);
// Indexes
-- Collection<QueryIndex> indexes = queryEntity.getIndexes();
++ Collection<QueryIndex> indexes = qryEntity.getIndexes();
if (indexes != null) {
writer.writeInt(indexes.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
index 772813f,fa33d3a..456bb86
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
@@@ -67,13 -65,13 +67,13 @@@ public class CachePluginManager extend
public CachePluginManager(GridKernalContext ctx, CacheConfiguration cfg) {
this.ctx = ctx;
this.cfg = cfg;
-
+
- for (PluginProvider p : ctx.plugins().allProviders()) {
- CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg);
+ if (cfg.getPluginConfigurations() != null) {
+ for (CachePluginConfiguration cachePluginCfg : cfg.getPluginConfigurations()) {
+ CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg, cachePluginCfg);
- CachePluginProvider provider = p.createCacheProvider(pluginCtx);
+ CachePluginProvider provider = cachePluginCfg.createProvider(pluginCtx);
- if (provider != null) {
providersList.add(provider);
providersMap.put(pluginCtx, provider);
}
@@@ -99,9 -97,9 +99,9 @@@
}
/** {@inheritDoc} */
- @Override protected void stop0(boolean cancel) {
+ @Override protected void stop0(boolean cancel, boolean destroy) {
- for (ListIterator<CachePluginProvider> iter = providersList.listIterator(); iter.hasPrevious();)
- iter.previous().stop(cancel);
+ for (int i = providersList.size() - 1; i >= 0; i--)
+ providersList.get(i).stop(cancel);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cf85a52,0ff6d8b..8b282ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -62,9 -62,9 +62,10 @@@ import org.apache.ignite.configuration.
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryObjectEx;
+ import org.apache.ignite.internal.binary.BinaryObjectExImpl;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@@ -742,83 -685,40 +742,70 @@@ public class GridQueryProcessor extend
if (log.isDebugEnabled())
log.debug("Store [space=" + space + ", key=" + key + ", val=" + val + "]");
-- CacheObjectContext coctx = null;
-
- if (ctx.indexing().enabled()) {
- coctx = cacheObjectContext(space);
-
- Object key0 = unwrap(key, coctx);
-
- Object val0 = unwrap(val, coctx);
-
- ctx.indexing().store(space, key0, val0, expirationTime);
- }
--
if (idx == null)
return;
if (!busyLock.enterBusy())
- return;
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
-- if (coctx == null)
-- coctx = cacheObjectContext(space);
++ CacheObjectContext coctx = cacheObjectContext(space);
- Class<?> valCls = null;
+ TypeDescriptor desc = typeByValue(coctx, key, val, true);
- TypeId id;
+ if (prevVal != null) {
+ TypeDescriptor prevValDesc = typeByValue(coctx, key, prevVal, false);
- boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
+ if (prevValDesc != null && prevValDesc != desc)
+ idx.remove(space, prevValDesc, key, partId, prevVal, prevVer);
+ }
- if (binaryVal) {
- int typeId = ctx.cacheObjects().typeId(val);
+ if (desc == null)
+ return;
- id = new TypeId(space, typeId);
- }
- else {
- valCls = val.value(coctx, false).getClass();
+ idx.store(space, desc, key, partId, val, ver, expirationTime, link);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
- id = new TypeId(space, valCls);
- }
+ /**
+ * @param coctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param checkType If {@code true} checks that key and value type correspond to found TypeDescriptor.
+ * @return Type descriptor if found.
+ * @throws IgniteCheckedException If type check failed.
+ */
+ @Nullable private TypeDescriptor typeByValue(CacheObjectContext coctx,
+ KeyCacheObject key,
+ CacheObject val,
+ boolean checkType)
+ throws IgniteCheckedException {
+ Class<?> valCls = null;
- TypeDescriptor desc = types.get(id);
+ TypeId id;
- if (desc == null || !desc.registered())
- return;
+ boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
+
+ if (binaryVal) {
+ int typeId = ctx.cacheObjects().typeId(val);
+ id = new TypeId(coctx.cacheName(), typeId);
+ }
+ else {
+ valCls = val.value(coctx, false).getClass();
+
+ id = new TypeId(coctx.cacheName(), valCls);
+ }
+
+ TypeDescriptor desc = types.get(id);
+
+ if (desc == null || !desc.registered())
+ return null;
+
+ if (checkType) {
if (!binaryVal && !desc.valueClass().isAssignableFrom(valCls))
throw new IgniteCheckedException("Failed to update index due to class name conflict" +
"(multiple classes with same simple name are stored in the same cache) " +
@@@ -1143,33 -1039,14 +1124,21 @@@
if (log.isDebugEnabled())
log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]");
- CacheObjectContext coctx = null;
-
- if (ctx.indexing().enabled()) {
- coctx = cacheObjectContext(space);
-
- Object key0 = unwrap(key, coctx);
-
- ctx.indexing().remove(space, key0);
- }
-
- // If val == null we only need to call SPI.
- if (idx == null)
+ if (idx == null || val == null)
return;
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to remove from index (grid is stopping).");
try {
- if (coctx == null)
- coctx = cacheObjectContext(space);
- idx.remove(space, key, val);
++ CacheObjectContext coctx = cacheObjectContext(space);
+
+ TypeDescriptor desc = typeByValue(coctx, key, val, false);
+
+ if (desc == null)
+ return;
+
+ idx.remove(space, desc, key, partId, val, ver);
}
finally {
busyLock.leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 914c3a3,986fff7..096c531
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1666,26 -1651,24 +1671,26 @@@ public class GridServiceProcessor exten
onReassignmentFailed(topVer, retries);
}
+ Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
+
// Clean up zombie assignments.
- for (Cache.Entry<Object, Object> e :
- cache.entrySetx(CU.cachePrimary(ctx.grid().affinity(cache.name()), ctx.grid().localNode()))) {
- if (!(e.getKey() instanceof GridServiceAssignmentsKey))
- continue;
+ while (it.hasNext()) {
+ Cache.Entry<Object, Object> e = it.next();
- if (cache.context().affinity().primary(ctx.grid().localNode(), e.getKey(), topVer)) {
- String name = ((GridServiceAssignmentsKey)e.getKey()).name();
++ if (cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), topVer)) {
+ String name = ((GridServiceAssignmentsKey)e.getKey()).name();
- try {
- if (cache.get(new GridServiceDeploymentKey(name)) == null) {
- if (log.isDebugEnabled())
- log.debug("Removed zombie assignments: " + e.getValue());
+ try {
+ if (cache.get(new GridServiceDeploymentKey(name)) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Removed zombie assignments: " + e.getValue());
- cache.getAndRemove(e.getKey());
+ cache.getAndRemove(e.getKey());
+ }
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
}
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 389d058,1cca9d3..d2e0714
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@@ -74,7 -74,7 +74,8 @@@ import java.nio.channels.SelectionKey
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.nio.file.Files;
+import java.nio.file.Path;
+ import java.nio.file.Paths;
import java.security.AccessController;
import java.security.KeyManagementException;
import java.security.MessageDigest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4b140be,4c89a7c..00ca409
mode 100644,100755..100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2c86322,7a0b713..1343151
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@@ -5869,14 -5869,15 +5869,14 @@@ public abstract class GridCacheAbstract
@Override public void run(int idx) throws Exception {
GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
- if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
- return;
-
int size = 0;
+ if (ctx.isNear())
+ ctx = ctx.near().dht().context();
+
for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+ GridCacheEntryEx e = ctx.cache().entryEx(key);
assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
assert !e.deleted() : "Entry is deleted: " + e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index be3933f,e76ab40..691100f
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@@ -411,9 -417,31 +410,28 @@@ public class GridCacheTestEntryEx exten
}
/** @inheritDoc */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Override public void clearReserveForLoad(GridCacheVersion ver) {
+ assert false;
+ }
+
+ /** @inheritDoc */
+ @Override public EntryGetResult innerGetAndReserveForLoad(
- boolean readSwap,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary,
+ @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ assert false;
+
+ return null;
+ }
+
+ /** @inheritDoc */
+ @Nullable @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
- boolean readSwap,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
UUID subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index b746883,e47a18d..a78d8ef
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@@ -5553,12 -5545,10 +5551,12 @@@ public class IgniteCacheConfigVariation
int size = 0;
+ if (ctx.isNear())
+ ctx = ctx.near().dht().context();
+
for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+ GridCacheEntryEx e = ctx.cache().entryEx(key);
assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
assert !e.deleted() : "Entry is deleted: " + e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------