You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/10 12:47:06 UTC
[2/2] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93724584
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93724584
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93724584
Branch: refs/heads/ignite-5075
Commit: 93724584a5f3c791fa517b304cd6bc72f127b22c
Parents: 0096266
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 10 11:10:20 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 10 15:46:54 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 50 ++++++++-
.../cache/CacheGroupInfrastructure.java | 12 ++
.../processors/cache/GridCacheEntryInfo.java | 95 ++++++----------
.../cache/GridCacheGroupIdMessage.java | 105 +++++++++++++++++
.../processors/cache/GridCacheIdMessage.java | 112 +++++++++++++++++++
.../processors/cache/GridCacheIoManager.java | 85 +++++++-------
.../processors/cache/GridCacheMessage.java | 91 +++++++--------
.../GridCachePartitionExchangeManager.java | 28 +++--
.../cache/GridCachePreloaderAdapter.java | 9 +-
.../processors/cache/GridCacheProcessor.java | 20 +++-
.../GridChangeGlobalStateMessageResponse.java | 15 ++-
.../distributed/GridCacheTtlUpdateRequest.java | 4 +-
.../distributed/GridDistributedBaseMessage.java | 3 +-
.../GridDistributedTxFinishResponse.java | 23 ++--
.../dht/GridDhtAffinityAssignmentRequest.java | 10 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 12 +-
.../distributed/dht/GridDhtLockResponse.java | 2 +-
.../dht/GridDhtTxFinishResponse.java | 14 +--
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 11 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../GridDhtAtomicDeferredUpdateResponse.java | 4 +-
.../dht/atomic/GridDhtAtomicNearResponse.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 5 +-
.../GridNearAtomicAbstractUpdateRequest.java | 4 +-
.../GridNearAtomicCheckUpdateRequest.java | 4 +-
.../atomic/GridNearAtomicUpdateResponse.java | 4 +-
.../dht/preloader/GridDhtForceKeysRequest.java | 4 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 8 +-
.../GridDhtPartitionDemandMessage.java | 12 +-
.../dht/preloader/GridDhtPartitionDemander.java | 8 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 11 +-
.../GridDhtPartitionSupplyMessage.java | 32 +++---
.../GridDhtPartitionsAbstractMessage.java | 17 ++-
.../preloader/GridDhtPartitionsFullMessage.java | 27 +++--
.../GridDhtPartitionsSingleMessage.java | 27 +++--
.../GridDhtPartitionsSingleRequest.java | 7 +-
.../dht/preloader/GridDhtPreloader.java | 6 +-
.../distributed/near/GridNearGetRequest.java | 4 +-
.../distributed/near/GridNearGetResponse.java | 4 +-
.../near/GridNearSingleGetRequest.java | 4 +-
.../near/GridNearSingleGetResponse.java | 4 +-
.../near/GridNearTxFinishResponse.java | 14 +--
.../processors/cache/local/GridLocalCache.java | 2 +-
.../local/atomic/GridLocalAtomicCache.java | 2 +-
.../cache/query/GridCacheQueryRequest.java | 3 +-
.../cache/query/GridCacheQueryResponse.java | 4 +-
.../CacheContinuousQueryBatchAck.java | 4 +-
.../cache/transactions/TxLocksRequest.java | 15 ++-
.../cache/transactions/TxLocksResponse.java | 23 ++--
.../CacheAtomicSingleMessageCountSelfTest.java | 2 +-
.../GridCacheConditionalDeploymentSelfTest.java | 7 ++
.../cache/IgniteOnePhaseCommitInvokeTest.java | 4 +-
.../CacheLateAffinityAssignmentTest.java | 4 +-
.../IgniteCacheReadFromBackupTest.java | 5 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 4 +-
.../near/GridCacheNearReadersSelfTest.java | 4 +-
.../communication/GridCacheMessageSelfTest.java | 15 +++
.../testframework/junits/GridAbstractTest.java | 17 +++
.../query/h2/database/H2PkHashIndex.java | 3 +-
.../query/h2/database/H2RowFactory.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 12 +-
63 files changed, 711 insertions(+), 350 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 99ec08a..eb083da 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,8 +44,25 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.transactions.TxLocksRequest;
+import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -170,7 +187,34 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
-// gen.generateAndWrite(GridChangeGlobalStateMessageResponse.class);
+ gen.generateAndWrite(GridCacheIdMessage.class);
+ gen.generateAndWrite(GridCacheGroupIdMessage.class);
+
+ gen.generateAndWrite(GridCacheEntryInfo.class);
+
+ gen.generateAndWrite(GridCacheMessage.class);
+
+ gen.generateAndWrite(GridDhtPartitionSupplyMessage.class);
+ gen.generateAndWrite(GridDhtPartitionDemandMessage.class);
+
+ gen.generateAndWrite(TxLocksRequest.class);
+ gen.generateAndWrite(TxLocksResponse.class);
+
+ gen.generateAndWrite(GridDhtTxOnePhaseCommitAckRequest.class);
+ gen.generateAndWrite(GridChangeGlobalStateMessageResponse.class);
+
+ gen.generateAndWrite(GridDhtPartitionsFullMessage.class);
+ gen.generateAndWrite(GridDhtPartitionsSingleMessage.class);
+ gen.generateAndWrite(GridDhtPartitionsSingleRequest.class);
+
+ gen.generateAndWrite(GridDhtAffinityAssignmentRequest.class);
+ gen.generateAndWrite(GridDhtAffinityAssignmentResponse.class);
+
+ gen.generateAndWrite(GridDistributedTxFinishResponse.class);
+ gen.generateAndWrite(GridDhtTxFinishResponse.class);
+ gen.generateAndWrite(GridNearTxFinishResponse.class);
+
+ gen.generateAndWrite(GridDhtPartitionsAbstractMessage.class);
// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 7051547..786335c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -101,6 +101,9 @@ public class CacheGroupInfrastructure {
/** IO policy. */
private final byte ioPlc;
+ /** */
+ private boolean depEnabled;
+
/**
* @param grpId Group ID.
* @param ctx Context.
@@ -134,9 +137,18 @@ public class CacheGroupInfrastructure {
ioPlc = cacheType.ioPolicy();
+ depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
+
log = ctx.kernalContext().log(getClass());
}
+ /**
+ * @return {@code True} if deployment is enabled.
+ */
+ public boolean deploymentEnabled() {
+ return depEnabled;
+ }
+
public GridCachePreloader preloader() {
return preldr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index e50fbfe..852d95d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -42,9 +42,6 @@ public class GridCacheEntryInfo implements Message {
@GridToStringInclude
private KeyCacheObject key;
- /** Key bytes, set when entry is read from swap and there is no key instance. */
- private byte[] keyBytes;
-
/** Cache ID. */
private int cacheId;
@@ -90,20 +87,6 @@ public class GridCacheEntryInfo implements Message {
}
/**
- * @param bytes Key bytes.
- */
- public void keyBytes(byte[] bytes) {
- this.keyBytes = bytes;
- }
-
- /**
- * @return Key bytes.
- */
- public byte[] keyBytes() {
- return keyBytes;
- }
-
- /**
* @return Entry key.
*/
public KeyCacheObject key() {
@@ -230,24 +213,18 @@ public class GridCacheEntryInfo implements Message {
writer.incrementState();
case 3:
- if (!writer.writeByteArray("keyBytes", keyBytes))
- return false;
-
- writer.incrementState();
-
- case 4:
if (!writer.writeLong("ttl", ttl))
return false;
writer.incrementState();
- case 5:
+ case 4:
if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 6:
+ case 5:
if (!writer.writeMessage("ver", ver))
return false;
@@ -291,14 +268,6 @@ public class GridCacheEntryInfo implements Message {
reader.incrementState();
case 3:
- keyBytes = reader.readByteArray("keyBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
ttl = reader.readLong("ttl");
if (!reader.isLastRead())
@@ -306,7 +275,7 @@ public class GridCacheEntryInfo implements Message {
reader.incrementState();
- case 5:
+ case 4:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -314,7 +283,7 @@ public class GridCacheEntryInfo implements Message {
reader.incrementState();
- case 6:
+ case 5:
ver = reader.readMessage("ver");
if (!reader.isLastRead())
@@ -334,7 +303,7 @@ public class GridCacheEntryInfo implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 6;
}
/**
@@ -350,21 +319,13 @@ public class GridCacheEntryInfo implements Message {
/**
* @return Marshalled size.
*/
- public int marshalledSize(GridCacheContext ctx) throws IgniteCheckedException {
+ public int marshalledSize(CacheObjectContext ctx) throws IgniteCheckedException {
int size = 0;
- CacheObjectContext cacheObjCtx = ctx.cacheObjectContext();
-
if (val != null)
- size += val.valueBytes(cacheObjCtx).length;
-
- if (key == null) {
- assert keyBytes != null;
+ size += val.valueBytes(ctx).length;
- size += keyBytes.length;
- }
- else
- size += key.valueBytes(cacheObjCtx).length;
+ size += key.valueBytes(ctx).length;
return SIZE_OVERHEAD + size;
}
@@ -374,13 +335,20 @@ public class GridCacheEntryInfo implements Message {
* @throws IgniteCheckedException In case of error.
*/
public void marshal(GridCacheContext ctx) throws IgniteCheckedException {
- assert key != null ^ keyBytes != null;
+ marshal(ctx.cacheObjectContext());
+ }
+
+ /**
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void marshal(CacheObjectContext ctx) throws IgniteCheckedException {
+ assert key != null;
- if (key != null)
- key.prepareMarshal(ctx.cacheObjectContext());
+ key.prepareMarshal(ctx);
if (val != null)
- val.prepareMarshal(ctx.cacheObjectContext());
+ val.prepareMarshal(ctx);
if (expireTime == 0)
expireTime = -1;
@@ -400,20 +368,21 @@ public class GridCacheEntryInfo implements Message {
* @throws IgniteCheckedException If unmarshalling failed.
*/
public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException {
- if (key == null) {
- assert keyBytes != null;
-
- CacheObjectContext cacheObjCtx = ctx.cacheObjectContext();
-
- Object key0 = ctx.cacheObjects().unmarshal(cacheObjCtx, keyBytes, clsLdr);
+ unmarshal(ctx.cacheObjectContext(), clsLdr);
+ }
- key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, ctx, key0, false);
- }
- else
- key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr);
+ /**
+ * Unmarshalls entry.
+ *
+ * @param ctx Cache context.
+ * @param clsLdr Class loader.
+ * @throws IgniteCheckedException If unmarshalling failed.
+ */
+ public void unmarshal(CacheObjectContext ctx, ClassLoader clsLdr) throws IgniteCheckedException {
+ key.finishUnmarshal(ctx, clsLdr);
if (val != null)
- val.finishUnmarshal(ctx.cacheObjectContext(), clsLdr);
+ val.finishUnmarshal(ctx, clsLdr);
long remaining = expireTime;
@@ -428,4 +397,4 @@ public class GridCacheEntryInfo implements Message {
@Override public String toString() {
return S.toString(GridCacheEntryInfo.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
new file mode 100644
index 0000000..29a978e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public abstract class GridCacheGroupIdMessage extends GridCacheMessage {
+ /** Cache group ID. */
+ @GridToStringInclude
+ protected int grpId;
+
+ /**
+ * @return Cache group ID.
+ */
+ public int groupId() {
+ return grpId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final int handlerId() {
+ return grpId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 2:
+ if (!writer.writeInt("grpId", grpId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 2:
+ grpId = reader.readInt("grpId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridCacheGroupIdMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheGroupIdMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
new file mode 100644
index 0000000..25a553b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public abstract class GridCacheIdMessage extends GridCacheMessage {
+ /** Cache ID. */
+ @GridToStringInclude
+ protected int cacheId;
+
+ /**
+ * @return Cache ID.
+ */
+ public int cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ */
+ public void cacheId(int cacheId) {
+ this.cacheId = cacheId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 2:
+ if (!writer.writeInt("cacheId", cacheId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 2:
+ cacheId = reader.readInt("cacheId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridCacheIdMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int handlerId() {
+ return cacheId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheIdMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5e7e401..0d279ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -157,7 +157,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Wait for exchange before processing message [msg=" + msg +
", node=" + nodeId +
", waitVer=" + waitVer +
- ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']');
+ ", cacheDesc=" + cacheDescriptor(cacheMsg) + ']');
}
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -246,6 +246,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
};
+ private DynamicCacheDescriptor cacheDescriptor(GridCacheMessage msg) {
+ return null; // TODO IGNITE-5075.
+ }
+
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
@@ -259,14 +263,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (msgIdx >= 0) {
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
- IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId());
+ IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId());
if (cacheClsHandlers != null)
c = cacheClsHandlers[msgIdx];
}
if (c == null)
- c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
+ c = clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
if (c == null) {
IgniteLogger log = cacheMsg.messageLogger(cctx);
@@ -277,7 +281,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()).
append(", msgTopVer=").append(cacheMsg.topologyVersion()).
- append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())).
+ append(", cacheDesc=").append(cacheDescriptor(cacheMsg)).
append(']');
msg0.append(U.nl()).append("Registered listeners:");
@@ -512,14 +516,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
throws IgniteCheckedException {
- GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+ assert msg != null;
+
+ GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
+ cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
switch (msg.directType()) {
case 30: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
GridDhtLockResponse res = new GridDhtLockResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.version(),
req.futureId(),
req.miniId(),
@@ -552,17 +559,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.partition(),
req.futureId(),
- ctx.deploymentEnabled());
+ false);
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
if (req.nearNodeId() != null) {
- GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+ GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
@@ -580,12 +587,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
- ctx.deploymentEnabled());
+ false);
res.error(req.classError());
@@ -598,10 +605,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.futureId(),
req.miniId(),
- ctx.deploymentEnabled()
+ false
);
res.error(req.classError());
@@ -621,7 +628,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearGetRequest req = (GridNearGetRequest)msg;
GridNearGetResponse res = new GridNearGetResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
@@ -637,7 +644,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
case 50: {
GridNearGetResponse res = (GridNearGetResponse)msg;
- CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
+ CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -657,7 +664,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearLockRequest req = (GridNearLockRequest)msg;
GridNearLockResponse res = new GridNearLockResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.version(),
req.futureId(),
req.miniId(),
@@ -665,7 +672,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
0,
req.classError(),
null,
- ctx.deploymentEnabled());
+ false);
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
}
@@ -704,7 +711,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cctx.deploymentEnabled());
cctx.io().sendOrderedMessage(
- ctx.node(nodeId),
+ cctx.node(nodeId),
TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
res,
ctx.ioPolicy(),
@@ -723,7 +730,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
GridNearSingleGetResponse res = new GridNearSingleGetResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.futureId(),
req.topologyVersion(),
null,
@@ -740,7 +747,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
case 117: {
GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
- GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc()
+ GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc()
.future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
if (fut == null) {
@@ -761,12 +768,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
- ctx.deploymentEnabled());
+ false);
res.error(req.classError());
@@ -779,12 +786,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
- ctx.deploymentEnabled());
+ false);
res.error(req.classError());
@@ -797,12 +804,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
- ctx.deploymentEnabled());
+ false);
res.error(req.classError());
@@ -815,17 +822,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
- ctx.cacheId(),
+ req.cacheId(),
req.partition(),
req.futureId(),
- ctx.deploymentEnabled());
+ false);
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
if (req.nearNodeId() != null) {
- GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+ GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
@@ -875,8 +882,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (txState != null)
txState.unwindEvicts(cctx);
}
- else {
- GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+ else if (msg instanceof GridCacheIdMessage) {
+ GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
if (ctx != null)
CU.unwindEvicts(ctx);
@@ -1170,13 +1177,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/**
* Adds message handler.
*
- * @param cacheId Cache ID.
+ * @param hndId Message handler ID.
* @param type Type of message.
* @param c Handler.
*/
@SuppressWarnings({"unchecked"})
public void addHandler(
- int cacheId,
+ int hndId,
Class<? extends GridCacheMessage> type,
IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
int msgIdx = messageIndex(type);
@@ -1184,16 +1191,16 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (msgIdx != -1) {
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
- IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId);
+ IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId);
if (cacheClsHandlers == null) {
cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
- idxClsHandlers0.put(cacheId, cacheClsHandlers);
+ idxClsHandlers0.put(hndId, cacheClsHandlers);
}
if (cacheClsHandlers[msgIdx] != null)
- throw new IgniteException("Duplicate cache message ID found [cacheId=" + cacheId +
+ throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId +
", type=" + type + ']');
cacheClsHandlers[msgIdx] = c;
@@ -1203,11 +1210,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return;
}
else {
- ListenerKey key = new ListenerKey(cacheId, type);
+ ListenerKey key = new ListenerKey(hndId, type);
if (clsHandlers.putIfAbsent(key,
(IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
- assert false : "Handler for class already registered [cacheId=" + cacheId + ", cls=" + type +
+ assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type +
", old=" + clsHandlers.get(key) + ", new=" + c + ']';
}
@@ -1215,7 +1222,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log0 != null && log0.isTraceEnabled())
log0.trace(
- "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type +
+ "Registered cache communication handler [hndId=" + hndId + ", type=" + type +
", msgIdx=" + msgIdx + ", handler=" + c + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4de465c..ec5efad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -84,9 +84,10 @@ public abstract class GridCacheMessage implements Message {
@GridDirectTransient
private boolean skipPrepare;
- /** Cache ID. */
- @GridToStringInclude
- protected int cacheId;
+ /**
+ * @return ID to distinguish message handlers for the same messages but for different caches/cache groups.
+ */
+ public abstract int handlerId();
/**
* @return Error, if any.
@@ -170,20 +171,6 @@ public abstract class GridCacheMessage implements Message {
}
/**
- * @return Cache ID.
- */
- public int cacheId() {
- return cacheId;
- }
-
- /**
- * @param cacheId Cache ID.
- */
- public void cacheId(int cacheId) {
- this.cacheId = cacheId;
- }
-
- /**
* Gets topology version or -1 in case of topology version is not required for this message.
*
* @return Topology version.
@@ -205,6 +192,15 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
protected final void prepareObject(@Nullable Object o, GridCacheContext ctx) throws IgniteCheckedException {
+ prepareObject(o, ctx.shared());
+ }
+
+ /**
+ * @param o Object to prepare for marshalling.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void prepareObject(@Nullable Object o, GridCacheSharedContext ctx) throws IgniteCheckedException {
assert addDepInfo || forceAddDepInfo;
if (!skipPrepare && o != null) {
@@ -281,22 +277,25 @@ public abstract class GridCacheMessage implements Message {
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
- protected final void marshalInfo(GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ protected final void marshalInfo(GridCacheEntryInfo info,
+ GridCacheSharedContext ctx,
+ CacheObjectContext cacheObjCtx
+ ) throws IgniteCheckedException {
assert ctx != null;
if (info != null) {
- info.marshal(ctx);
+ info.marshal(cacheObjCtx);
if (addDepInfo) {
if (info.key() != null)
- prepareObject(info.key().value(ctx.cacheObjectContext(), false), ctx);
+ prepareObject(info.key().value(cacheObjCtx, false), ctx);
CacheObject val = info.value();
if (val != null) {
- val.finishUnmarshal(ctx.cacheObjectContext(), ctx.deploy().globalLoader());
+ val.finishUnmarshal(cacheObjCtx, ctx.deploy().globalLoader());
- prepareObject(CU.value(val, ctx, false), ctx);
+ prepareObject(val.value(cacheObjCtx, false), ctx);
}
}
}
@@ -314,7 +313,7 @@ public abstract class GridCacheMessage implements Message {
assert ctx != null;
if (info != null)
- info.unmarshal(ctx, ldr);
+ info.unmarshal(ctx.cacheObjectContext(), ldr);
}
/**
@@ -324,13 +323,14 @@ public abstract class GridCacheMessage implements Message {
*/
protected final void marshalInfos(
Iterable<? extends GridCacheEntryInfo> infos,
- GridCacheContext ctx
+ GridCacheSharedContext ctx,
+ CacheObjectContext cacheObjCtx
) throws IgniteCheckedException {
assert ctx != null;
if (infos != null)
for (GridCacheEntryInfo e : infos)
- marshalInfo(e, ctx);
+ marshalInfo(e, ctx, cacheObjCtx);
}
/**
@@ -369,14 +369,14 @@ public abstract class GridCacheMessage implements Message {
if (addDepInfo) {
if (e.key() != null)
- prepareObject(e.key().value(cctx.cacheObjectContext(), false), cctx);
+ prepareObject(e.key().value(cctx.cacheObjectContext(), false), ctx);
if (e.value() != null)
- prepareObject(e.value().value(cctx.cacheObjectContext(), false), cctx);
+ prepareObject(e.value().value(cctx.cacheObjectContext(), false), ctx);
if (e.entryProcessors() != null) {
for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
- prepareObject(entProc.get1(), cctx);
+ prepareObject(entProc.get1(), ctx);
}
}
else if (p2pEnabled && e.entryProcessors() != null) {
@@ -384,7 +384,7 @@ public abstract class GridCacheMessage implements Message {
forceAddDepInfo = true;
for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors())
- prepareObject(entProc.get1(), cctx);
+ prepareObject(entProc.get1(), ctx);
}
}
}
@@ -435,7 +435,7 @@ public abstract class GridCacheMessage implements Message {
Object arg = args[i];
if (addDepInfo)
- prepareObject(arg, ctx);
+ prepareObject(arg, ctx.shared());
argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
}
@@ -487,7 +487,7 @@ public abstract class GridCacheMessage implements Message {
for (Object o : col) {
if (addDepInfo)
- prepareObject(o, ctx);
+ prepareObject(o, ctx.shared());
byteCol.add(o == null ? null : CU.marshal(ctx, o));
}
@@ -522,7 +522,7 @@ public abstract class GridCacheMessage implements Message {
obj.prepareMarshal(ctx.cacheObjectContext());
if (addDepInfo)
- prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+ prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
}
}
@@ -541,7 +541,7 @@ public abstract class GridCacheMessage implements Message {
obj.prepareMarshal(ctx.cacheObjectContext());
if (addDepInfo)
- prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+ prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared());
}
}
}
@@ -630,6 +630,11 @@ public abstract class GridCacheMessage implements Message {
}
/** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -642,18 +647,12 @@ public abstract class GridCacheMessage implements Message {
switch (writer.state()) {
case 0:
- if (!writer.writeInt("cacheId", cacheId))
- return false;
-
- writer.incrementState();
-
- case 1:
if (!writer.writeMessage("depInfo", depInfo))
return false;
writer.incrementState();
- case 2:
+ case 1:
if (!writer.writeLong("msgId", msgId))
return false;
@@ -673,14 +672,6 @@ public abstract class GridCacheMessage implements Message {
switch (reader.state()) {
case 0:
- cacheId = reader.readInt("cacheId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
depInfo = reader.readMessage("depInfo");
if (!reader.isLastRead())
@@ -688,7 +679,7 @@ public abstract class GridCacheMessage implements Message {
reader.incrementState();
- case 2:
+ case 1:
msgId = reader.readLong("msgId");
if (!reader.isLastRead())
@@ -714,6 +705,6 @@ public abstract class GridCacheMessage implements Message {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);
+ return S.toString(GridCacheMessage.class, this);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b6dcf33..c5401e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -387,18 +387,26 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
-
- if (cacheCtx != null) {
- if (m instanceof GridDhtPartitionSupplyMessage)
- cacheCtx.preloader().handleSupplyMessage(
- idx, id, (GridDhtPartitionSupplyMessage)m);
- else if (m instanceof GridDhtPartitionDemandMessage)
- cacheCtx.preloader().handleDemandMessage(
- idx, id, (GridDhtPartitionDemandMessage)m);
+ if (m instanceof GridCacheGroupIdMessage) {
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(((GridCacheGroupIdMessage)m).groupId());
+
+ if (grp != null) {
+ if (m instanceof GridDhtPartitionSupplyMessage) {
+ grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
+
+ return;
+ }
+ else if (m instanceof GridDhtPartitionDemandMessage) {
+ grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+
+ return;
+ }
+ }
else
- U.error(log, "Unsupported message type: " + m.getClass().getName());
+ U.warn(log, "Failed to find cache group [msg=" + m + ']');
}
+
+ U.error(log, "Unsupported message type: " + m.getClass().getName());
}
finally {
leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index db1a2e9..d005aae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -136,15 +137,15 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(GridCacheContext ctx, Collection<KeyCacheObject> keys,
+ @Override public GridDhtFuture<Object> request(GridCacheContext ctx, Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer) {
- return new GridFinishedFuture<>();
+ return null;
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(GridCacheContext ctx, GridNearAtomicAbstractUpdateRequest req,
+ @Override public GridDhtFuture<Object> request(GridCacheContext ctx, GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer) {
- return new GridFinishedFuture<>();
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2238dc1..4239fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1838,11 +1838,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- if (grp == null)
- grp = startCacheGroup(grpDesc, affNode, cacheObjCtx, exchTopVer);
+ if (grp == null) {
+ grp = startCacheGroup(grpDesc,
+ cacheType,
+ affNode,
+ cacheObjCtx,
+ exchTopVer);
+ }
+ }
+ else {
+ grp = startCacheGroup(grpDesc,
+ cacheType,
+ affNode,
+ cacheObjCtx,
+ exchTopVer);
}
- else
- grp = startCacheGroup(grpDesc, affNode, cacheObjCtx, exchTopVer);
GridCacheContext cacheCtx = createCache(ccfg,
grp,
@@ -1872,6 +1882,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private CacheGroupInfrastructure startCacheGroup(
CacheGroupDescriptor desc,
+ CacheType cacheType,
boolean affNode,
CacheObjectContext cacheObjCtx,
AffinityTopologyVersion exchTopVer)
@@ -1886,6 +1897,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx,
desc.groupId(),
+ cacheType,
cfg,
affNode,
memPlc,
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
index 62b67b1..bfe6eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
@@ -59,6 +59,11 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage {
this.err = err;
}
+ /** {@inheritDoc} */
+ @Override public int handlerId() {
+ return 0;
+ }
+
/**
*
*/
@@ -106,13 +111,13 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage {
}
switch (writer.state()) {
- case 3:
+ case 2:
if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
- case 4:
+ case 3:
if (!writer.writeUuid("requestId", requestId))
return false;
@@ -134,7 +139,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage {
return false;
switch (reader.state()) {
- case 3:
+ case 2:
errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
@@ -142,7 +147,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage {
reader.incrementState();
- case 4:
+ case 3:
requestId = reader.readUuid("requestId");
if (!reader.isLastRead())
@@ -162,7 +167,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 4;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 5d1885e..c092132 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class GridCacheTtlUpdateRequest extends GridCacheMessage {
+public class GridCacheTtlUpdateRequest extends GridCacheIdMessage {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 630c79f..65b16a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -37,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Base for all messages in replicated cache.
*/
-public abstract class GridDistributedBaseMessage extends GridCacheMessage implements GridCacheDeployable,
+public abstract class GridDistributedBaseMessage extends GridCacheIdMessage implements GridCacheDeployable,
GridCacheVersionable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 561c292..79db810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -71,6 +71,11 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
}
/** {@inheritDoc} */
+ @Override public int handlerId() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
@Override public final int partition() {
return part;
}
@@ -135,25 +140,25 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
}
switch (writer.state()) {
- case 3:
+ case 2:
if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 4:
+ case 3:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 5:
+ case 4:
if (!writer.writeInt("part", part))
return false;
writer.incrementState();
- case 6:
+ case 5:
if (!writer.writeMessage("txId", txId))
return false;
@@ -175,7 +180,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
return false;
switch (reader.state()) {
- case 3:
+ case 2:
flags = reader.readByte("flags");
if (!reader.isLastRead())
@@ -183,7 +188,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
- case 4:
+ case 3:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -191,7 +196,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
- case 5:
+ case 4:
part = reader.readInt("part");
if (!reader.isLastRead())
@@ -199,7 +204,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
- case 6:
+ case 5:
txId = reader.readMessage("txId");
if (!reader.isLastRead())
@@ -219,7 +224,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 6;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0b3080e..14eb92f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Affinity assignment request.
*/
-public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
+public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -45,17 +45,17 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
* @param waitTopVer Topology version to wait for before message processing.
*/
- public GridDhtAffinityAssignmentRequest(int cacheId,
+ public GridDhtAffinityAssignmentRequest(int grpId,
AffinityTopologyVersion topVer,
AffinityTopologyVersion waitTopVer) {
assert topVer != null;
assert waitTopVer != null;
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.topVer = topVer;
this.waitTopVer = waitTopVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..6c01c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -27,20 +27,18 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.NotNull;
/**
* Affinity assignment response.
*/
-public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
+public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -69,14 +67,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int cacheId,
+ public GridDhtAffinityAssignmentResponse(int grpId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index ea6ca06..87abd6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -174,7 +174,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
}
if (preloadEntries != null)
- marshalInfos(preloadEntries, cctx);
+ marshalInfos(preloadEntries, cctx.shared(), cctx.cacheObjectContext());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index d777a22..6d717eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -173,19 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 7:
+ case 6:
if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
return false;
writer.incrementState();
- case 8:
+ case 7:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 9:
+ case 8:
if (!writer.writeMessage("retVal", retVal))
return false;
@@ -207,7 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 7:
+ case 6:
checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
if (!reader.isLastRead())
@@ -215,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 8:
+ case 7:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -223,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 9:
+ case 8:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -243,7 +243,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 9;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
index c483408..3b68a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -47,6 +47,11 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public int handlerId() {
+ return 0;
+ }
+
/**
*
* @param vers Near Tx xid Versions.
@@ -87,7 +92,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
}
switch (writer.state()) {
- case 3:
+ case 2:
if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
return false;
@@ -109,7 +114,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
return false;
switch (reader.state()) {
- case 3:
+ case 2:
vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -129,6 +134,6 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 3;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e2b7803..bd238d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1086,7 +1086,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
Collection<KeyCacheObject> keys = entry.getValue();
- lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+ GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+ lastForceFut = ctx.group().preloader().request(ctx, keys, tx.topologyVersion());
if (compFut != null && lastForceFut != null)
compFut.add(lastForceFut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 579796d..d2dc817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
/** Skip store flag bit mask. */
private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e477592..8a4c3c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -239,10 +239,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
metrics = m;
- preldr = new GridDhtPreloader(ctx);
-
- preldr.start();
-
ctx.io().addHandler(
ctx.cacheId(),
GridNearGetRequest.class,
@@ -1611,7 +1607,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicAbstractUpdateRequest req,
final UpdateReplyClosure completionCb
) {
- IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion());
if (forceFut == null || forceFut.isDone()) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 92ef149..0c069da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridLongList;
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Deferred dht atomic update response.
*/
-public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicDeferredUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index d6e2db0..71d2321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -36,7 +36,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic
/**
* Message sent from DHT nodes to near node in FULL_SYNC mode.
*/
-public class GridDhtAtomicNearResponse extends GridCacheMessage {
+public class GridDhtAtomicNearResponse extends GridCacheIdMessage {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 693d658..7b2547a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
@@ -27,7 +26,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -39,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* DHT atomic cache backup update response.
*/
-public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 4b3ea5bc..bb47af4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/93724584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 4b9109e..96be023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage {
/** */
private static final long serialVersionUID = 0L;