You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/11 16:56:31 UTC
ignite git commit: ignite-5578 Affinity for local join
Repository: ignite
Updated Branches:
refs/heads/ignite-5578-locJoin [created] 6c52ee107
ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c52ee10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c52ee10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c52ee10
Branch: refs/heads/ignite-5578-locJoin
Commit: 6c52ee107cde481f43bba1772267ab83361c9497
Parents: e93b284
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 18:04:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 19:56:23 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 9 +-
.../internal/managers/discovery/DiscoCache.java | 39 +++
.../cache/CacheAffinitySharedManager.java | 33 ++-
.../processors/cache/ExchangeContext.java | 59 +++++
.../GridCachePartitionExchangeManager.java | 7 +-
.../dht/preloader/CacheGroupAffinity.java | 156 +++++++++++
.../GridDhtPartitionsAbstractMessage.java | 9 +
.../GridDhtPartitionsExchangeFuture.java | 259 +++++++++++++------
.../preloader/GridDhtPartitionsFullMessage.java | 90 ++++++-
.../GridDhtPartitionsSingleMessage.java | 44 +++-
.../GridCacheDatabaseSharedManager.java | 1 +
11 files changed, 593 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3dac18e..261a619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -869,7 +870,13 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124..127] [-23..-27] [-36..-55]- this
+ case 128:
+ msg = new CacheGroupAffinity();
+
+ break;
+
+
+ // [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
// [-54..-60] - Snapshots
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..72f482a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -280,6 +280,45 @@ public class DiscoCache {
}
/**
+ * @param order Order.
+ * @return Server node instance.
+ */
+ @Nullable public ClusterNode serverNodeByOrder(long order) {
+ int idx = serverNodeBinarySearch(order);
+
+ if (idx >= 0)
+ return srvNodes.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @param order Node order.
+ * @return Node index.
+ */
+ private int serverNodeBinarySearch(long order) {
+ int low = 0;
+ int high = srvNodes.size() - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+
+ ClusterNode midVal = srvNodes.get(mid);
+
+ int cmp = Long.compare(midVal.order(), order);
+
+ if (cmp < 0)
+ low = mid + 1;
+ else if (cmp > 0)
+ high = mid - 1;
+ else
+ return mid;
+ }
+
+ return -(low + 1);
+ }
+
+ /**
* @param nodes Cluster nodes.
* @return Empty collection if nodes list is {@code null}
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 79ab183..f72d0e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1224,6 +1224,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param topVer Topology version.
+ * @param grpId Cache group ID.
+ * @return Affinity assignments.
+ */
+ public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) {
+ CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+ assert grpHolder != null : grpId;
+
+ return grpHolder.affinity().assignments(topVer);
+ }
+
+ /**
* Called on exchange initiated by server node join.
*
* @param fut Exchange future.
@@ -1319,18 +1332,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
grp.affinity().initialize(fut.topologyVersion(), assignment);
}
else {
- CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
+ if (fut.context().fetchAffinityOnJoin()) {
+ CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
- assert grpDesc != null : grp.cacheOrGroupName();
+ assert grpDesc != null : grp.cacheOrGroupName();
- GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- grpDesc.groupId(),
- topVer,
- fut.discoCache());
+ GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+ grpDesc.groupId(),
+ topVer,
+ fut.discoCache());
- fetchFut.init(false);
+ fetchFut.init(false);
- fetchFuts.add(fetchFut);
+ fetchFuts.add(fetchFut);
+ }
+ else
+ fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
new file mode 100644
index 0000000..167ec4b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ExchangeContext {
+ /** */
+ private Set<Integer> requestGrpsAffOnJoin;
+
+ /** */
+ private boolean fetchAffOnJoin;
+
+ /**
+ * @return {@code True} if on local join need fetch affinity per-group (old protocol),
+ * otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}.
+ */
+ public boolean fetchAffinityOnJoin() {
+ return false;
+ }
+
+ /**
+ * @param grpId Cache group ID.
+ */
+ public void addGroupAffinityRequestOnJoin(Integer grpId) {
+ if (requestGrpsAffOnJoin == null)
+ requestGrpsAffOnJoin = new HashSet<>();
+
+ requestGrpsAffOnJoin.add(grpId);
+ }
+
+ /**
+ * @return Groups to request affinity for.
+ */
+ @Nullable public Set<Integer> groupsAffinityRequestOnJoin() {
+ return requestGrpsAffOnJoin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d4fe93f..69653a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1068,8 +1068,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param id ID.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
- id,
+ GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
false);
@@ -1090,14 +1089,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param targetNode Target node.
* @param exchangeId ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
* @return Message.
*/
- public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
- @Nullable GridDhtPartitionExchangeId exchangeId,
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
boolean clientOnlyExchange,
boolean sndCounters)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
new file mode 100644
index 0000000..e29ee06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CacheGroupAffinity implements Message {
+ /** */
+ private int grpId;
+
+ /** */
+ @GridDirectCollection(GridLongList.class)
+ private List<GridLongList> assign;
+
+ /**
+ *
+ */
+ public CacheGroupAffinity() {
+ // No-op.
+ }
+
+ /**
+ * @param grpId Group ID.
+ * @param assign0 Assignment.
+ */
+ CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) {
+ this.grpId = grpId;
+
+ assign = new ArrayList<>(assign0.size());
+
+ for (int i = 0; i < assign0.size(); i++) {
+ List<ClusterNode> nodes = assign0.get(i);
+
+ GridLongList l = new GridLongList(nodes.size());
+
+ for (int n = 0; n < nodes.size(); n++)
+ l.add(nodes.get(n).order());
+
+ assign.add(l);
+ }
+ }
+
+ /**
+ * @return Cache group ID.
+ */
+ int groupId() {
+ return grpId;
+ }
+
+ /**
+ * @return Assignments.
+ */
+ List<GridLongList> assignments() {
+ return assign;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("assign", assign, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("grpId", grpId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ assign = reader.readCollection("assign", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ grpId = reader.readInt("grpId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CacheGroupAffinity.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 128;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 441952d..20b33e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -64,6 +64,15 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
this.lastVer = lastVer;
}
+ /**
+ * @param msg Message.
+ */
+ void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+ msg.exchId = exchId;
+ msg.lastVer = lastVer;
+ msg.flags = flags;
+ }
+
/** {@inheritDoc} */
@Override public boolean cacheGroupMessage() {
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8989aaa..72a3fe5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.ExchangeContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -75,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -187,9 +189,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private CacheAffinityChangeMessage affChangeMsg;
- /** */
- private boolean clientOnlyExchange;
-
/** Init timestamp. Used to track the amount of time spent to complete the future. */
private long initTs;
@@ -219,6 +218,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private final AtomicBoolean done = new AtomicBoolean();
+ /** */
+ @GridToStringExclude
+ private ExchangeContext exchCtx;
+
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
@@ -254,6 +257,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * @return Exchange context.
+ */
+ public ExchangeContext context() {
+ assert exchCtx != null : this;
+
+ return exchCtx;
+ }
+
+ /**
* @param exchActions Exchange actions.
*/
public void exchangeActions(ExchangeActions exchActions) {
@@ -423,6 +435,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert discoEvt != null : this;
assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+ exchCtx = new ExchangeContext();
+
try {
discoCache.updateAlives(cctx.discovery());
@@ -479,26 +493,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
- else {
- cctx.activate();
-
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
- cctx.cache().cachesToStartOnLocalJoin();
-
- if (cctx.database().persistenceEnabled() &&
- !cctx.kernalContext().clientNode()) {
- List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
-
- if (caches != null) {
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
- startDescs.add(c.get1());
- }
-
- cctx.database().readCheckpointAndRestoreMemory(startDescs);
- }
-
- cctx.cache().startCachesOnLocalJoin(caches, topVer);
- }
+ else
+ initCachesOnLocalJoin();
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -565,6 +561,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @throws IgniteCheckedException If failed.
*/
+ private void initCachesOnLocalJoin() throws IgniteCheckedException {
+ cctx.activate();
+
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+ cctx.cache().cachesToStartOnLocalJoin();
+
+ if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+ List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+ if (caches != null) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+ startDescs.add(c.get1());
+ }
+
+ cctx.database().readCheckpointAndRestoreMemory(startDescs);
+ }
+
+ cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
private void initTopologies() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
@@ -783,40 +802,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private void clientOnlyExchange() throws IgniteCheckedException {
- clientOnlyExchange = true;
-
if (crd != null) {
- if (crd.isLocal()) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- boolean updateTop = !grp.isLocal() &&
- exchId.topologyVersion().equals(grp.localStartVersion());
-
- if (updateTop) {
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.groupId() == grp.groupId()) {
- GridDhtPartitionFullMap fullMap = top.partitionMap(true);
+ assert !crd.isLocal() : crd;
- assert fullMap != null;
-
- grp.topology().update(topologyVersion(),
- fullMap,
- top.updateCounters(false),
- Collections.<Integer>emptySet());
-
- break;
- }
- }
- }
- }
- }
- else {
- if (!centralizedAff)
- sendLocalPartitions(crd);
+ if (!centralizedAff)
+ sendLocalPartitions(crd);
- initDone();
+ initDone();
- return;
- }
+ return;
}
else {
if (centralizedAff) { // Last server node failed.
@@ -869,7 +863,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
continue;
- grp.topology().beforeExchange(this, !centralizedAff);
+ if (!localJoinExchange() || cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))
+ grp.topology().beforeExchange(this, !centralizedAff);
}
cctx.database().beforeExchange(this);
@@ -889,8 +884,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*/
private void tryToPerformLocalSnapshotOperation() {
try {
- IgniteInternalFuture fut = cctx.snapshot()
- .tryStartLocalSnapshotOperation(discoEvt);
+ IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
if (fut != null)
fut.get();
@@ -1096,12 +1090,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * @return {@code True} if exchange for local node join.
+ */
+ private boolean localJoinExchange() {
+ return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
+ }
+
+ /**
* @param node Node.
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
assert node != null;
+ GridDhtPartitionsSingleMessage msg;
+
// Reset lost partition before send local partition to coordinator.
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1110,22 +1113,33 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
resetLostPartitions(caches);
}
- GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
- node, exchangeId(), clientOnlyExchange, true);
+ if (cctx.kernalContext().clientNode() || (localJoinExchange() && !cctx.database().persistenceEnabled())) {
+ msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+ cctx.kernalContext().clientNode(),
+ cctx.versions().last(),
+ true);
+ }
+ else {
+ msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
+ false,
+ true);
- Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+ Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
- if (partHistReserved0 != null)
- m.partitionHistoryCounters(partHistReserved0);
+ if (partHistReserved0 != null)
+ msg.partitionHistoryCounters(partHistReserved0);
+ }
if (stateChangeExchange() && changeGlobalStateE != null)
- m.setError(changeGlobalStateE);
+ msg.setError(changeGlobalStateE);
+ else if (localJoinExchange())
+ msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
if (log.isDebugEnabled())
- log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
+ log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
try {
- cctx.io().send(node, m, SYSTEM_POOL);
+ cctx.io().send(node, msg, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1155,21 +1169,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param nodes Nodes.
+ * @param cachesAff Affinity if was requested by some nodes.
* @throws IgniteCheckedException If failed.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = createPartitionsMessage(true);
+ private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff)
+ throws IgniteCheckedException {
+ GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+
+ GridDhtPartitionsFullMessage msgWithAff = null;
assert !nodes.contains(cctx.localNode());
if (log.isDebugEnabled()) {
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
- ", exchId=" + exchId + ", msg=" + m + ']');
+ ", exchId=" + exchId + ", msg=" + msg + ']');
}
+ msg.prepareMarshal(cctx);
+
for (ClusterNode node : nodes) {
+ GridDhtPartitionsFullMessage sndMsg = msg;
+
+ if (cachesAff != null) {
+ GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
+
+ if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
+ if (msgWithAff == null)
+ msgWithAff = msg.copyWithAffinity(cachesAff);
+
+ sndMsg = msgWithAff;
+ }
+ }
+
try {
- cctx.io().send(node, m, SYSTEM_POOL);
+ cctx.io().send(node, sndMsg, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (cctx.io().checkNodeLeft(node.id(), e, false)) {
@@ -1687,21 +1720,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
- if (msg instanceof GridDhtPartitionsSingleMessage) {
- GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+ Map<Integer, CacheGroupAffinity> cachesAff = null;
+
+ for (GridDhtPartitionsSingleMessage msg : msgs.values()) {
+ for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
+ Integer grpId = entry.getKey();
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
- Integer grpId = entry.getKey();
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+ GridDhtPartitionTopology top = grp != null ? grp.topology() :
+ cctx.exchange().clientTopology(grpId, this);
- GridDhtPartitionTopology top = grp != null ? grp.topology() :
- cctx.exchange().clientTopology(grpId, this);
+ Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
- Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
+ if (cntrs != null)
+ top.applyUpdateCounters(cntrs);
+ }
+
+ Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+ if (affReq != null) {
+ if (cachesAff == null)
+ cachesAff = U.newHashMap(affReq.size());
- if (cntrs != null)
- top.applyUpdateCounters(cntrs);
+ for (Integer grpId : affReq) {
+ List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
+
+ cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
}
}
}
@@ -1777,7 +1821,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (!nodes.isEmpty())
- sendAllPartitions(nodes);
+ sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
onDone(exchangeId().topologyVersion(), err);
}
@@ -1813,7 +1857,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
if (n != null)
- sendAllPartitions(F.asList(n));
+ sendAllPartitions(F.asList(n), null);
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
@@ -1907,6 +1951,59 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
+
+ if (localJoinExchange() && affReq != null) {
+ Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+ Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
+
+ assert !F.isEmpty(cachesAff) : cachesAff;
+ assert cachesAff.size() >= affReq.size();
+
+ int cnt = 0;
+
+ for (CacheGroupAffinity aff : cachesAff) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+ if (grp != null) {
+ assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
+
+ List<GridLongList> assignments = aff.assignments();
+ List<List<ClusterNode>> assignments0 = new ArrayList<>(assignments.size());
+
+ for (int p = 0; p < assignments.size(); p++) {
+ GridLongList assign = assignments.get(p);
+ List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+
+ for (int n = 0; n < assign.size(); n++) {
+ long order = assign.get(n);
+
+ ClusterNode affNode = nodesByOrder.get(order);
+
+ if (affNode == null) {
+ affNode = discoCache.serverNodeByOrder(order);
+
+ assert affNode != null : order;
+
+ nodesByOrder.put(order, affNode);
+ }
+
+ assign0.add(affNode);
+ }
+
+ assignments0.add(assign0);
+ }
+
+ grp.affinity().initialize(topologyVersion(), assignments0);
+
+ cnt++;
+ }
+ }
+
+ assert affReq.size() == cnt : cnt;
+ }
+
updatePartitionFullMap(msg);
IgniteCheckedException err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 75609b8..0a723f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,19 +19,23 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +103,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@GridDirectTransient
private transient boolean compress;
+ /** */
+ @GridDirectCollection(CacheGroupAffinity.class)
+ private Collection<CacheGroupAffinity> cachesAff;
+
/**
* Required by {@link Externalizable}.
*/
@@ -126,6 +134,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/** {@inheritDoc} */
+ @Override void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+ super.copyStateTo(msg);
+
+ GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
+
+ cp.parts = parts;
+ cp.dupPartsData = dupPartsData;
+ cp.partsBytes = partsBytes;
+ cp.partCntrs = partCntrs;
+ cp.partCntrsBytes = partCntrsBytes;
+ cp.partHistSuppliers = partHistSuppliers;
+ cp.partHistSuppliersBytes = partHistSuppliersBytes;
+ cp.partsToReload = partsToReload;
+ cp.partsToReloadBytes = partsToReloadBytes;
+ cp.topVer = topVer;
+ cp.cachesAff = cachesAff;
+ }
+
+ /**
+ * @param cachesAff Affinity.
+ * @return Message copy.
+ */
+ public GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
+ assert !F.isEmpty(cachesAff) : cachesAff;
+
+ GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
+
+ copyStateTo(cp);
+
+ cp.cachesAff = cachesAff;
+
+ return cp;
+ }
+
+ /**
+ * @return Affinity.
+ */
+ @Nullable public Collection<CacheGroupAffinity> cachesAffinity() {
+ return cachesAff;
+ }
+
+ /** {@inheritDoc} */
@Override public int handlerId() {
return 0;
}
@@ -406,42 +456,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (writer.state()) {
case 5:
- if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+ if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 6:
- if (!writer.writeByteArray("errsBytes", errsBytes))
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeByteArray("errsBytes", errsBytes))
return false;
writer.incrementState();
case 8:
- if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
return false;
writer.incrementState();
case 10:
- if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
writer.incrementState();
case 11:
+ if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -464,7 +520,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (reader.state()) {
case 5:
- dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+ cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -472,7 +528,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 6:
- errsBytes = reader.readByteArray("errsBytes");
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
if (!reader.isLastRead())
return false;
@@ -480,7 +536,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 7:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ errsBytes = reader.readByteArray("errsBytes");
if (!reader.isLastRead())
return false;
@@ -488,7 +544,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 8:
- partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -496,7 +552,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 9:
- partsBytes = reader.readByteArray("partsBytes");
+ partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
if (!reader.isLastRead())
return false;
@@ -504,7 +560,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 10:
- partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+ partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
return false;
@@ -512,6 +568,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 11:
+ partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -531,7 +595,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 13;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..4c98742 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.io.Externalizable;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -88,6 +90,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@GridDirectTransient
private transient boolean compress;
+ /** */
+ @GridDirectCollection(Integer.class)
+ private Collection<Integer> grpsAffRequest;
+
/**
* Required by {@link Externalizable}.
*/
@@ -111,6 +117,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
this.compress = compress;
}
+ /**
+ * @param grpsAffRequest
+ */
+ public void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) {
+ this.grpsAffRequest = grpsAffRequest;
+ }
+
+ /**
+ * @return
+ */
+ @Nullable public Collection<Integer> cacheGroupsAffinityRequest() {
+ return grpsAffRequest;
+ }
+
/** {@inheritDoc} */
@Override public int handlerId() {
return 0;
@@ -374,18 +394,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 8:
- if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 10:
+ if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -432,7 +458,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 8:
- partCntrsBytes = reader.readByteArray("partCntrsBytes");
+ grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -440,7 +466,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 9:
- partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -448,6 +474,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 10:
+ partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -467,7 +501,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 12;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5136731..1f56cc0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1540,6 +1540,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long partMetaId = pageMem.partitionMetaPageId(grpId, i);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+
try {
long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);