You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/25 15:34:27 UTC
[ignite] branch ignite-cdc updated: IGNITE-13596 Flag to
distinguish DataRecord on primary and backup added (#8904)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch ignite-cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-cdc by this push:
new db8bb81 IGNITE-13596 Flag to distinguish DataRecord on primary and backup added (#8904)
db8bb81 is described below
commit db8bb814c5c5a4876b21de7a22126ed74d6ffddf
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Mar 25 18:34:03 2021 +0300
IGNITE-13596 Flag to distinguish DataRecord on primary and backup added (#8904)
---
.../util/GridCommandHandlerClusterByClassTest.java | 3 +-
.../apache/ignite/util/GridCommandHandlerTest.java | 3 +-
.../internal/pagemem/wal/record/DataEntry.java | 16 +++-
.../internal/pagemem/wal/record/DataRecord.java | 2 +-
.../internal/pagemem/wal/record/LazyDataEntry.java | 6 +-
.../internal/pagemem/wal/record/MvccDataEntry.java | 2 +-
.../pagemem/wal/record/UnwrapDataEntry.java | 6 +-
.../internal/pagemem/wal/record/WALRecord.java | 23 ++++-
.../processors/cache/GridCacheMapEntry.java | 42 ++++++---
.../GridDistributedTxRemoteAdapter.java | 3 +-
.../dht/colocated/GridDhtDetachedCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearCacheEntry.java | 8 +-
.../GridCacheDatabaseSharedManager.java | 3 +
.../wal/reader/StandaloneWalRecordsIterator.java | 7 +-
.../wal/serializer/RecordDataV1Serializer.java | 47 ++++++----
.../wal/serializer/RecordDataV2Serializer.java | 10 ++-
.../cache/transactions/IgniteTxLocalAdapter.java | 3 +-
.../java/org/apache/ignite/TestStorageUtils.java | 3 +-
.../IgnitePdsSporadicDataRecordsOnBackupTest.java | 2 +-
...CheckpointSimulationWithRealCpDisabledTest.java | 2 +-
.../persistence/db/wal/IgniteWalRebalanceTest.java | 3 +-
.../db/wal/reader/IgniteWalReaderTest.java | 100 ++++++++++++++++++++-
.../testframework/wal/record/RecordUtils.java | 9 ++
.../ignite/development/utils/DataEntryWrapper.java | 3 +-
.../apache/ignite/development/utils/WalStat.java | 2 +-
.../utils/IgniteWalConverterArgumentsTest.java | 4 +-
.../utils/IgniteWalConverterSensitiveDataTest.java | 3 +-
.../development/utils/IgniteWalConverterTest.java | 4 +-
28 files changed, 257 insertions(+), 70 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
index 4014d4e..aa55b91 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
@@ -1523,7 +1523,8 @@ public class GridCommandHandlerClusterByClassTest extends GridCommandHandlerClus
new GridCacheVersion(),
0L,
partId,
- updateCntr
+ updateCntr,
+ false
);
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 80c33ee..0b525ee 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2630,7 +2630,8 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
new GridCacheVersion(),
0L,
partId,
- updateCntr
+ updateCntr,
+ false
);
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index dd05726..2244897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -61,6 +61,10 @@ public class DataEntry {
@GridToStringInclude
protected long partCnt;
+ /** If {@code true} then change made on primary node. */
+ @GridToStringInclude
+ protected boolean primary;
+
/** Constructor. */
private DataEntry() {
// No-op, used from factory methods.
@@ -76,6 +80,7 @@ public class DataEntry {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
+ * @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public DataEntry(
int cacheId,
@@ -86,7 +91,8 @@ public class DataEntry {
GridCacheVersion writeVer,
long expireTime,
int partId,
- long partCnt
+ long partCnt,
+ boolean primary
) {
this.cacheId = cacheId;
this.key = key;
@@ -97,6 +103,7 @@ public class DataEntry {
this.expireTime = expireTime;
this.partId = partId;
this.partCnt = partCnt;
+ this.primary = primary;
// Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
@@ -177,6 +184,13 @@ public class DataEntry {
return expireTime;
}
+ /**
+ * @return {@code True} if node is primary for partition in the moment of logging.
+ */
+ public boolean primary() {
+ return primary;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataEntry.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index ef6c3ba..2507fd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -35,7 +35,7 @@ public class DataRecord extends TimeStampRecord {
/** {@inheritDoc} */
@Override public RecordType type() {
- return RecordType.DATA_RECORD;
+ return RecordType.DATA_RECORD_V2;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
index ba2fabc..e771bdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
@@ -60,6 +60,7 @@ public class LazyDataEntry extends DataEntry implements MarshalledDataEntry {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
+ * @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public LazyDataEntry(
GridCacheSharedContext cctx,
@@ -73,9 +74,10 @@ public class LazyDataEntry extends DataEntry implements MarshalledDataEntry {
GridCacheVersion writeVer,
long expireTime,
int partId,
- long partCnt
+ long partCnt,
+ boolean primary
) {
- super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+ super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);
this.cctx = cctx;
this.keyType = keyType;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
index c86593f..cecd867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
@@ -56,7 +56,7 @@ public class MvccDataEntry extends DataEntry {
long partCnt,
MvccVersion mvccVer
) {
- super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+ super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, false);
this.mvccVer = mvccVer;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
index ad5e1d0..9395142 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
@@ -48,6 +48,7 @@ public class UnwrapDataEntry extends DataEntry implements UnwrappedDataEntry {
* @param partCnt Partition counter.
* @param cacheObjValCtx cache object value context for unwrapping objects.
* @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead.
+ * @param primary {@code True} if node is primary for partition in the moment of logging.
*/
public UnwrapDataEntry(
final int cacheId,
@@ -60,8 +61,9 @@ public class UnwrapDataEntry extends DataEntry implements UnwrappedDataEntry {
final int partId,
final long partCnt,
final CacheObjectValueContext cacheObjValCtx,
- final boolean keepBinary) {
- super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+ final boolean keepBinary,
+ final boolean primary) {
+ super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, primary);
this.cacheObjValCtx = cacheObjValCtx;
this.keepBinary = keepBinary;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index f07b71a..2f95c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -45,7 +45,8 @@ public abstract class WALRecord {
/** */
PAGE_RECORD(1, PHYSICAL),
- /** */
+ /** @deprecated Use {@link #DATA_RECORD_V2} instead. */
+ @Deprecated
DATA_RECORD(2, LOGICAL),
/** Checkpoint (begin) record */
@@ -206,7 +207,11 @@ public abstract class WALRecord {
/** Encrypted WAL-record. */
ENCRYPTED_RECORD(52, PHYSICAL),
- /** Ecnrypted data record. */
+ /**
+ * Ecnrypted data record.
+ * @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
+ */
+ @Deprecated
ENCRYPTED_DATA_RECORD(53, LOGICAL),
/** Mvcc data record. */
@@ -239,7 +244,11 @@ public abstract class WALRecord {
/** Encrypted WAL-record. */
ENCRYPTED_RECORD_V2(63, PHYSICAL),
- /** Ecnrypted data record. */
+ /**
+ * Ecnrypted data record.
+ * @deprecated Use {@link #ENCRYPTED_DATA_RECORD_V3} instead.
+ */
+ @Deprecated
ENCRYPTED_DATA_RECORD_V2(64, LOGICAL),
/** Master key change record containing multiple keys for single cache group. */
@@ -252,7 +261,13 @@ public abstract class WALRecord {
PARTITION_META_PAGE_DELTA_RECORD_V3(67, PHYSICAL),
/** Index meta page delta record includes encryption status data. */
- INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL);
+ INDEX_META_PAGE_DELTA_RECORD(68, PHYSICAL),
+
+ /** Data record V2. */
+ DATA_RECORD_V2(69, LOGICAL),
+
+ /** Ecnrypted data record. */
+ ENCRYPTED_DATA_RECORD_V3(70, LOGICAL);
/** Index for serialization. Should be consistent throughout all versions. */
private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0221a7..3807404 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1569,7 +1569,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateCntr0 = nextPartitionCounter(tx, updateCntr);
if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
+ logPtr = logTxUpdate(tx, val, expireTime, updateCntr0, topVer);
update(val, expireTime, ttl, newVer, true);
@@ -1791,7 +1791,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateCntr0 = nextPartitionCounter(tx, updateCntr);
if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = logTxUpdate(tx, null, 0, updateCntr0);
+ logPtr = logTxUpdate(tx, null, 0, updateCntr0, topVer);
drReplicate(drType, null, newVer, topVer);
@@ -2148,7 +2148,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(updated, expireTime, ttl, ver, true);
- logUpdate(op, updated, ver, expireTime, 0);
+ logUpdate(op, updated, ver, expireTime, 0, cctx.affinity().affinityTopologyVersion());
if (evt) {
CacheObject evtOld = null;
@@ -2180,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
- logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0);
+ logUpdate(op, null, ver, CU.EXPIRE_TIME_ETERNAL, 0, cctx.affinity().affinityTopologyVersion());
if (evt) {
CacheObject evtOld = null;
@@ -3495,7 +3495,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ver,
expireTime,
partition(),
- updateCntr
+ updateCntr,
+ cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer)
)));
}
}
@@ -4323,9 +4324,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param writeVer Write version.
* @param expireTime Expire time.
* @param updCntr Update counter.
+ * @param topVer Topology version.
*/
- protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr)
- throws IgniteCheckedException {
+ protected void logUpdate(
+ GridCacheOperation op,
+ CacheObject val,
+ GridCacheVersion writeVer,
+ long expireTime,
+ long updCntr,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
// We log individual updates only in ATOMIC cache.
assert cctx.atomic();
@@ -4340,7 +4348,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
writeVer,
expireTime,
partition(),
- updCntr)));
+ updCntr,
+ cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
}
catch (StorageException e) {
throw new IgniteCheckedException("Failed to log ATOMIC cache update [key=" + key + ", op=" + op +
@@ -4353,10 +4362,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param val Value.
* @param expireTime Expire time (or 0 if not applicable).
* @param updCntr Update counter.
+ * @param topVer Topology version.
* @throws IgniteCheckedException In case of log failure.
*/
- protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
- throws IgniteCheckedException {
+ protected WALPointer logTxUpdate(
+ IgniteInternalTx tx,
+ CacheObject val,
+ long expireTime,
+ long updCntr,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
assert cctx.transactional() && !cctx.transactionalSnapshot();
if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
@@ -4375,7 +4390,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
tx.writeVersion(),
expireTime,
key.partition(),
- updCntr)));
+ updCntr,
+ cctx.affinity().primaryByPartition(cctx.localNode(), partition(), topVer))));
}
else
return null;
@@ -6484,7 +6500,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);
- entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+ entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0, topVer);
if (!entry.isNear()) {
newRow = entry.localPartition().dataStore().createRow(
@@ -6571,7 +6587,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr);
- entry.logUpdate(op, null, newVer, 0, updateCntr0);
+ entry.logUpdate(op, null, newVer, 0, updateCntr0, topVer);
if (oldVal != null) {
assert !entry.deletedUnlocked();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 173888b..7cfe869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -631,7 +631,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
writeVersion(),
0,
txEntry.key().partition(),
- txEntry.updateCounter()
+ txEntry.updateCounter(),
+ cacheCtx.affinity().primaryByPartition(cctx.localNode(), txEntry.key().partition(), topVer)
),
txEntry
)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 48b8e8e..8e46629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -73,13 +74,14 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer, long expireTime, long updCntr) throws IgniteCheckedException {
+ @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion writeVer,
+ long expireTime, long updCntr, AffinityTopologyVersion topVer) {
// No-op for detached entries, index is updated on primary or backup nodes.
}
/** {@inheritDoc} */
- @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
- throws IgniteCheckedException {
+ @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+ AffinityTopologyVersion topVer) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index d8b1615..fc70aa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -471,14 +471,14 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime, long updCntr)
- throws IgniteCheckedException {
+ @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime,
+ long updCntr, AffinityTopologyVersion topVer) {
// No-op: queries are disabled for near cache.
}
/** {@inheritDoc} */
- @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
- throws IgniteCheckedException {
+ @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+ AffinityTopologyVersion topVer) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8fa76c8..8c0228e 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2496,6 +2496,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
+ case DATA_RECORD_V2:
checkpointReadLock();
try {
@@ -2635,8 +2636,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
case MVCC_DATA_RECORD:
case DATA_RECORD:
+ case DATA_RECORD_V2:
case ENCRYPTED_DATA_RECORD:
case ENCRYPTED_DATA_RECORD_V2:
+ case ENCRYPTED_DATA_RECORD_V3:
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 472afba..cfaa670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -354,7 +354,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
GridKernalContext kernalCtx = sharedCtx.kernalContext();
IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
- if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) {
+ if (processor != null && (rec.type() == RecordType.DATA_RECORD
+ || rec.type() == RecordType.DATA_RECORD_V2
+ || rec.type() == RecordType.MVCC_DATA_RECORD)) {
try {
return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
}
@@ -498,7 +500,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
dataEntry.partitionId(),
dataEntry.partitionCounter(),
coCtx,
- keepBinary);
+ keepBinary,
+ dataEntry.primary());
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index bc5f9e3..6ad08dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecordV2;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
@@ -121,7 +122,9 @@ import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V3;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2;
@@ -406,7 +409,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case PARTITION_DESTROY:
return /*cacheId*/4 + /*partId*/4;
- case DATA_RECORD:
+ case DATA_RECORD_V2:
DataRecord dataRec = (DataRecord)record;
return 4 + dataSize(dataRec);
@@ -667,12 +670,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_RECORD:
+ case DATA_RECORD_V2:
int entryCnt = in.readInt();
List<DataEntry> entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
- entries.add(readPlainDataEntry(in));
+ entries.add(readPlainDataEntry(in, type));
res = new DataRecord(entries, 0L);
@@ -680,12 +684,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
case ENCRYPTED_DATA_RECORD:
case ENCRYPTED_DATA_RECORD_V2:
+ case ENCRYPTED_DATA_RECORD_V3:
entryCnt = in.readInt();
entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
- entries.add(readEncryptedDataEntry(in, type == ENCRYPTED_DATA_RECORD_V2));
+ entries.add(readEncryptedDataEntry(in, type));
res = new DataRecord(entries, 0L);
@@ -1352,6 +1357,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
break;
case DATA_RECORD:
+ case DATA_RECORD_V2:
DataRecord dataRec = (DataRecord)rec;
buf.putInt(dataRec.writeEntries().size());
@@ -1956,6 +1962,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
buf.putInt(entry.partitionId());
buf.putLong(entry.partitionCounter());
buf.putLong(entry.expireTime());
+
+ if (!(entry instanceof MvccDataEntry))
+ buf.put(entry.primary() ? (byte)1 : 0);
}
/**
@@ -2002,14 +2011,16 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
/**
* @param in Input to read from.
- * @param readKeyId If {@code true} encryption key identifier will be read from {@code in}.
+ * @param recType Record type.
* @return Read entry.
* @throws IOException If failed.
* @throws IgniteCheckedException If failed.
*/
- DataEntry readEncryptedDataEntry(ByteBufferBackedDataInput in, boolean readKeyId) throws IOException, IgniteCheckedException {
+ DataEntry readEncryptedDataEntry(ByteBufferBackedDataInput in, RecordType recType) throws IOException, IgniteCheckedException {
boolean needDecryption = in.readByte() == ENCRYPTED;
+ RecordType dataRecordType = recType == ENCRYPTED_DATA_RECORD_V3 ? DATA_RECORD_V2 : DATA_RECORD;
+
if (needDecryption) {
if (encSpi == null) {
skipEncryptedRecord(in, false);
@@ -2017,22 +2028,23 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
return new EncryptedDataEntry();
}
- T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, false, readKeyId);
+ T3<ByteBufferBackedDataInput, Integer, RecordType> clData = readEncryptedData(in, false,
+ recType == ENCRYPTED_DATA_RECORD_V2 || recType == ENCRYPTED_DATA_RECORD_V3);
if (clData.get1() == null)
return null;
- return readPlainDataEntry(clData.get1());
+ return readPlainDataEntry(clData.get1(), dataRecordType);
}
- return readPlainDataEntry(in);
+ return readPlainDataEntry(in, dataRecordType);
}
/**
* @param in Input to read from.
* @return Read entry.
*/
- DataEntry readPlainDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+ DataEntry readPlainDataEntry(ByteBufferBackedDataInput in, RecordType type) throws IOException, IgniteCheckedException {
int cacheId = in.readInt();
int keySize = in.readInt();
@@ -2061,6 +2073,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
int partId = in.readInt();
long partCntr = in.readLong();
long expireTime = in.readLong();
+ boolean primary = type == DATA_RECORD_V2 && in.readByte() == (byte)1;
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
@@ -2083,7 +2096,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
writeVer,
expireTime,
partId,
- partCntr
+ partCntr,
+ primary
);
}
else
@@ -2099,7 +2113,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
writeVer,
expireTime,
partId,
- partCntr);
+ partCntr,
+ primary
+ );
}
/**
@@ -2113,10 +2129,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
if (needEncryption(rec))
return ENCRYPTED_RECORD_V2;
- if (rec.type() != DATA_RECORD)
+ if (rec.type() != DATA_RECORD && rec.type() != DATA_RECORD_V2)
return rec.type();
- return isDataRecordEncrypted((DataRecord)rec) ? ENCRYPTED_DATA_RECORD_V2 : DATA_RECORD;
+ return isDataRecordEncrypted((DataRecord)rec) ? ENCRYPTED_DATA_RECORD_V3 : rec.type();
}
/**
@@ -2235,7 +2251,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
/*write ver*/CacheVersionIO.size(entry.writeVersion(), false) +
/*part ID*/4 +
/*expire Time*/8 +
- /*part cnt*/8;
+ /*part cnt*/8 +
+ /*primary*/(entry instanceof MvccDataEntry ? 0 : 1);
}
/**
@@ -2268,7 +2285,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
public static class EncryptedDataEntry extends DataEntry {
/** Constructor. */
EncryptedDataEntry() {
- super(0, null, null, READ, null, null, 0, 0, 0);
+ super(0, null, null, READ, null, null, 0, 0, 0, false);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 8622629..08f132a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -56,8 +56,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
-
/**
* Record data V2 serializer.
*/
@@ -96,6 +94,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
return 4/*entry count*/ + 8/*timestamp*/ + dataSize((DataRecord)rec);
case DATA_RECORD:
+ case DATA_RECORD_V2:
return super.plainSize(rec) + 8/*timestamp*/;
case SNAPSHOT:
@@ -160,13 +159,14 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
return cpRec;
case DATA_RECORD:
+ case DATA_RECORD_V2:
int entryCnt = in.readInt();
long timeStamp = in.readLong();
List<DataEntry> entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
- entries.add(readPlainDataEntry(in));
+ entries.add(readPlainDataEntry(in, type));
return new DataRecord(entries, timeStamp);
@@ -183,13 +183,14 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
case ENCRYPTED_DATA_RECORD:
case ENCRYPTED_DATA_RECORD_V2:
+ case ENCRYPTED_DATA_RECORD_V3:
entryCnt = in.readInt();
timeStamp = in.readLong();
entries = new ArrayList<>(entryCnt);
for (int i = 0; i < entryCnt; i++)
- entries.add(readEncryptedDataEntry(in, type == ENCRYPTED_DATA_RECORD_V2));
+ entries.add(readEncryptedDataEntry(in, type));
return new DataRecord(entries, timeStamp);
@@ -261,6 +262,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
case MVCC_DATA_RECORD:
case DATA_RECORD:
+ case DATA_RECORD_V2:
DataRecord dataRec = (DataRecord)rec;
buf.putInt(dataRec.writeEntries().size());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7b6cd7f..208a9a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -861,7 +861,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
writeVersion(),
0,
txEntry.key().partition(),
- txEntry.updateCounter())));
+ txEntry.updateCounter(),
+ cacheCtx.affinity().primaryByPartition(cctx.localNode(), txEntry.key().partition(), topVer))));
}
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
diff --git a/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java b/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
index 17ff241..e0f415d 100644
--- a/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/TestStorageUtils.java
@@ -82,7 +82,8 @@ public class TestStorageUtils {
ver,
0L,
partId,
- updateCntr
+ updateCntr,
+ false
);
IgniteCacheDatabaseSharedManager db = ctx.shared().database();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
index d82d263..55b6b61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSporadicDataRecordsOnBackupTest.java
@@ -158,7 +158,7 @@ public class IgnitePdsSporadicDataRecordsOnBackupTest extends GridCommonAbstract
params.bufferSize(1024 * 1024);
params.filesOrDirs(walDir, walArchiveDir);
- params.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD);
+ params.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD_V2);
int cacheId = CU.cacheId(TX_CACHE_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index f829b90..a8dd46c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -358,7 +358,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
cctx.affinity().partition(i), i, new MvccVersionImpl(1000L, 10L, i + 1 /* Non-zero */)) :
new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(),
0L,
- cctx.affinity().partition(i), i));
+ cctx.affinity().partition(i), i, false));
}
UUID cpId = UUID.randomUUID();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 7567705..422cc23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -763,7 +763,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
new GridCacheVersion(0, 1, 1, 0),
0,
0,
- 0
+ 0,
+ false
)));
File walDir = U.field(walMgr, "walWorkDir");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 005d491..5c6c7c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -91,7 +91,7 @@ import static java.util.Arrays.fill;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MVCC_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
@@ -132,6 +132,12 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
/** Whether to enable WAL archive compaction. */
private boolean enableWalCompaction;
+ /** Backup count. */
+ private int backupCnt;
+
+ /** DataEntry from primary flag. */
+ private boolean primary = true;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -142,6 +148,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
ccfg.setIndexedTypes(Integer.class, IndexedObject.class);
+ ccfg.setBackups(backupCnt);
cfg.setCacheConfiguration(ccfg);
@@ -266,13 +273,16 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
WALRecord walRecord = tup.get2();
- if (walRecord.type() == DATA_RECORD || walRecord.type() == MVCC_DATA_RECORD) {
+ if (walRecord.type() == DATA_RECORD_V2 || walRecord.type() == MVCC_DATA_RECORD) {
DataRecord record = (DataRecord)walRecord;
for (DataEntry entry : record.writeEntries()) {
KeyCacheObject key = entry.key();
CacheObject val = entry.value();
+ if (walRecord.type() == DATA_RECORD_V2)
+ assertEquals(primary, entry.primary());
+
if (DUMP_RECORDS)
log.info("Op: " + entry.op() + ", Key: " + key + ", Value: " + val);
}
@@ -1046,6 +1056,87 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
* @throws Exception if failed.
*/
@Test
+ public void testPrimaryFlagOnTwoNodes() throws Exception {
+ backupCnt = 1;
+
+ IgniteEx ignite = startGrid("node0");
+ Ignite ignite1 = startGrid(1);
+
+ ignite.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, IndexedObject> cache = ignite.cache(CACHE_NAME);
+
+ backupCnt = 0;
+
+ int cntEntries = 100;
+
+ List<Integer> keys = findKeys(ignite.localNode(), cache, cntEntries, 0, 0);
+
+ Map<Integer, IndexedObject> map = new TreeMap<>();
+
+ for (Integer key : keys)
+ map.putIfAbsent(key, new IndexedObject(key));
+
+ cache.putAll(map);
+
+ ignite.cluster().active(false);
+
+ String subfolderName1 = genDbSubfolderName(ignite, 0);
+ String subfolderName2 = genDbSubfolderName(ignite1, 1);
+
+ stopAllGrids();
+
+ String workDir = U.defaultWorkDirectory();
+
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+ Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
+
+ IgniteInClosure<DataRecord> drHnd = dataRecord -> {
+ List<? extends DataEntry> entries = dataRecord.writeEntries();
+
+ for (DataEntry entry : entries) {
+ GridCacheOperation op = entry.op();
+ Integer cnt = operationsFound.get(op);
+
+ operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
+ }
+ };
+
+ scanIterateAndCount(
+ factory,
+ createIteratorParametersBuilder(workDir, subfolderName1)
+ .filesOrDirs(
+ workDir + "/db/wal/" + subfolderName1,
+ workDir + "/db/wal/archive/" + subfolderName1
+ ),
+ 1,
+ 1,
+ null, drHnd
+ );
+
+ primary = false;
+
+ scanIterateAndCount(
+ factory,
+ createIteratorParametersBuilder(workDir, subfolderName2)
+ .filesOrDirs(
+ workDir + "/db/wal/" + subfolderName2,
+ workDir + "/db/wal/archive/" + subfolderName2
+ ),
+ 1,
+ 1,
+ null,
+ drHnd
+ );
+ }
+
+ /**
+ * Tests transaction generation and WAL for putAll cache operation.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
public void testPutAllTxIntoTwoNodes() throws Exception {
Ignite ignite = startGrid("node0");
Ignite ignite1 = startGrid(1);
@@ -1397,7 +1488,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
//noinspection EnumSwitchStatementWhichMissesCases
switch (type) {
- case DATA_RECORD:
+ case DATA_RECORD_V2:
// Fallthrough.
case MVCC_DATA_RECORD: {
assert walRecord instanceof DataRecord;
@@ -1410,6 +1501,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
List<DataEntry> entries = dataRecord.writeEntries();
for (DataEntry entry : entries) {
+ if (walRecord.type() == DATA_RECORD_V2)
+ assertEquals(primary, entry.primary());
+
GridCacheVersion globalTxId = entry.nearXidVersion();
Object unwrappedKeyObj;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 27aa08a..41bf1ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -112,8 +112,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_UPDATE_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V3;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.EXCHANGE;
@@ -170,6 +172,7 @@ public class RecordUtils {
put(TX_RECORD, RecordUtils::buildTxRecord);
put(PAGE_RECORD, RecordUtils::buildPageSnapshot);
put(DATA_RECORD, RecordUtils::buildDataRecord);
+ put(DATA_RECORD_V2, RecordUtils::buildDataRecord);
put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord);
put(HEADER_RECORD, RecordUtils::buildHeaderRecord);
put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord);
@@ -231,6 +234,7 @@ public class RecordUtils {
put(ENCRYPTED_DATA_RECORD, RecordUtils::buildEncryptedDataRecord);
put(ENCRYPTED_RECORD_V2, RecordUtils::buildEncryptedRecordV2);
put(ENCRYPTED_DATA_RECORD_V2, RecordUtils::buildEncryptedDataRecordV2);
+ put(ENCRYPTED_DATA_RECORD_V3, RecordUtils::buildEncryptedDataRecordV3);
put(MVCC_DATA_RECORD, RecordUtils::buildMvccDataRecord);
put(MVCC_TX_RECORD, RecordUtils::buildMvccTxRecord);
put(CONSISTENT_CUT, RecordUtils::buildConsistentCutRecord);
@@ -594,6 +598,11 @@ public class RecordUtils {
}
/** **/
+ public static UnsupportedWalRecord buildEncryptedDataRecordV3() {
+ return new UnsupportedWalRecord(ENCRYPTED_DATA_RECORD_V3);
+ }
+
+ /** **/
public static MvccDataRecord buildMvccDataRecord() {
return new MvccDataRecord(Collections.emptyList(), 1);
}
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
index 49edc6c..25e404f 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java
@@ -65,7 +65,8 @@ class DataEntryWrapper extends DataEntry {
dataEntry.writeVersion(),
dataEntry.expireTime(),
dataEntry.partitionId(),
- dataEntry.partitionCounter()
+ dataEntry.partitionCounter(),
+ dataEntry.primary()
);
this.source = dataEntry;
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
index 993ae88..cbed361 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/WalStat.java
@@ -127,7 +127,7 @@ public class WalStat {
if (type == WALRecord.RecordType.PAGE_RECORD)
registerPageSnapshot((PageSnapshot)record);
- else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.MVCC_DATA_RECORD)
+ else if (type == WALRecord.RecordType.DATA_RECORD || type == WALRecord.RecordType.DATA_RECORD_V2 || type == WALRecord.RecordType.MVCC_DATA_RECORD)
registerDataRecord((DataRecord)record);
else if (type == WALRecord.RecordType.TX_RECORD || type == WALRecord.RecordType.MVCC_TX_RECORD)
registerTxRecord((TxRecord)record);
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
index b5c6976..928706f 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterArgumentsTest.java
@@ -391,7 +391,7 @@ public class IgniteWalConverterArgumentsTest extends GridCommonAbstractTest {
"binaryMetadataFileStoreDir=" + binaryMetadataDir.getAbsolutePath(),
"marshallerMappingFileStoreDir=" + marshallerDir.getAbsolutePath(),
"keepBinary=false",
- "recordTypes=DATA_RECORD,TX_RECORD",
+ "recordTypes=DATA_RECORD_V2,TX_RECORD",
"walTimeFromMillis=1575158400000",
"walTimeToMillis=1577836740999",
"recordContainsText=search string",
@@ -406,7 +406,7 @@ public class IgniteWalConverterArgumentsTest extends GridCommonAbstractTest {
Assert.assertEquals(binaryMetadataDir, parseArgs.getBinaryMetadataFileStoreDir());
Assert.assertEquals(marshallerDir, parseArgs.getMarshallerMappingFileStoreDir());
Assert.assertFalse(parseArgs.isKeepBinary());
- Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.DATA_RECORD));
+ Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.DATA_RECORD_V2));
Assert.assertTrue(parseArgs.getRecordTypes().contains(WALRecord.RecordType.TX_RECORD));
Assert.assertEquals(1575158400000L, (long)parseArgs.getFromTime());
Assert.assertEquals(1577836740999L, (long)parseArgs.getToTime());
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
index bb4cab9..487f791 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java
@@ -298,7 +298,8 @@ public class IgniteWalConverterSensitiveDataTest extends GridCommonAbstractTest
new GridCacheVersion(),
0,
0,
- 0
+ 0,
+ false
);
byte[] sensitiveDataBytes = SENSITIVE_DATA_VALUE_PREFIX.getBytes(StandardCharsets.UTF_8);
diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
index 896dc67..063e08f 100644
--- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
+++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java
@@ -316,7 +316,7 @@ public class IgniteWalConverterTest extends GridCommonAbstractTest {
final int len = Integer.reverseBytes(raf.readInt());
- if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD.index()) {
+ if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD_V2.index()) {
int i = 0;
int b;
@@ -426,7 +426,7 @@ public class IgniteWalConverterTest extends GridCommonAbstractTest {
if (recordTypeIndex > 0) {
recordTypeIndex--;
- if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD.index()) {
+ if (recordTypeIndex == WALRecord.RecordType.DATA_RECORD_V2.index()) {
find++;
if (find == 2) {