You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/11/13 07:41:36 UTC
[2/2] ignite git commit: IGNITE-10052: MVCC: fixed local node
recovery. This closes #5245. This closes #5345.
IGNITE-10052: MVCC: fixed local node recovery. This closes #5245. This closes #5345.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a719aa9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a719aa9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a719aa9c
Branch: refs/heads/master
Commit: a719aa9cafdf372aeaf067c5cecfc43b7e09989f
Parents: 2439ade
Author: AMRepo <an...@gmail.com>
Authored: Tue Nov 13 10:38:28 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 13 10:38:28 2018 +0300
----------------------------------------------------------------------
.../internal/pagemem/wal/record/DataEntry.java | 1 +
.../pagemem/wal/record/LazyDataEntry.java | 1 -
.../pagemem/wal/record/LazyMvccDataEntry.java | 149 ++++++++++
.../pagemem/wal/record/MvccDataEntry.java | 75 +++++
.../pagemem/wal/record/MvccDataRecord.java | 69 +++++
.../pagemem/wal/record/MvccTxRecord.java | 92 +++++++
.../internal/pagemem/wal/record/TxRecord.java | 34 +--
.../pagemem/wal/record/UnwrapMvccDataEntry.java | 130 +++++++++
.../internal/pagemem/wal/record/WALRecord.java | 10 +-
.../processors/cache/GridCacheMapEntry.java | 107 ++++++--
.../processors/cache/GridCacheProcessor.java | 10 +-
.../cache/IgniteCacheOffheapManager.java | 39 ++-
.../cache/IgniteCacheOffheapManagerImpl.java | 106 ++++++++
.../dht/GridDhtTxAbstractEnlistFuture.java | 15 -
.../cache/mvcc/MvccProcessorImpl.java | 62 +++--
.../processors/cache/mvcc/MvccUtils.java | 9 +-
.../processors/cache/mvcc/txlog/TxLog.java | 37 ++-
.../GridCacheDatabaseSharedManager.java | 104 ++++++-
.../persistence/GridCacheOffheapManager.java | 8 +
.../reader/StandaloneWalRecordsIterator.java | 70 +++--
.../wal/serializer/RecordDataV1Serializer.java | 26 +-
.../wal/serializer/RecordDataV2Serializer.java | 141 +++++++++-
.../wal/serializer/TxRecordSerializer.java | 132 ++++++++-
.../cache/transactions/IgniteTxAdapter.java | 28 +-
.../cache/transactions/IgniteTxHandler.java | 12 +-
.../cache/transactions/IgniteTxManager.java | 80 +++++-
.../cache/tree/mvcc/data/MvccUpdateDataRow.java | 2 +-
.../ignite/failure/IoomFailureHandlerTest.java | 28 +-
.../cache/mvcc/CacheMvccProcessorTest.java | 20 +-
.../cache/mvcc/CacheMvccTxFailoverTest.java | 272 +++++++++++++++++++
.../cache/mvcc/CacheMvccVacuumTest.java | 6 +
.../testsuites/IgniteCacheMvccTestSuite.java | 2 +
.../ignite/development/utils/WalStat.java | 4 +-
33 files changed, 1646 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index d13a68a..dd05726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -61,6 +61,7 @@ public class DataEntry {
@GridToStringInclude
protected long partCnt;
+ /** Constructor. */
private DataEntry() {
// No-op, used from factory methods.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
index 6b56da5..403d778 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
@@ -143,5 +143,4 @@ public class LazyDataEntry extends DataEntry {
public byte[] getValBytes() {
return valBytes;
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
new file mode 100644
index 0000000..15b1468
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+
+/**
+ * Represents Data Entry ({@link #key}, {@link #val value}) pair update {@link #op operation}. <br>
+ * This Data entry was not converted to key, value pair during record deserialization.
+ */
+public class LazyMvccDataEntry extends MvccDataEntry {
+ /** */
+ private GridCacheSharedContext cctx;
+
+ /** Data Entry key type code. See {@link CacheObject} for built-in value type codes */
+ private byte keyType;
+
+ /** Key value bytes. */
+ private byte[] keyBytes;
+
+ /** Data Entry Value type code. See {@link CacheObject} for built-in value type codes */
+ private byte valType;
+
+ /** Value value bytes. */
+ private byte[] valBytes;
+
+ /**
+ * @param cctx Shared context.
+ * @param cacheId Cache ID.
+ * @param keyType Object type code for Key.
+ * @param keyBytes Data Entry Key value bytes.
+ * @param valType Object type code for Value.
+ * @param valBytes Data Entry Value value bytes.
+ * @param op Operation.
+ * @param nearXidVer Near transaction version.
+ * @param writeVer Write version.
+ * @param expireTime Expire time.
+ * @param partId Partition ID.
+ * @param partCnt Partition counter.
+ * @param mvccVer Mvcc version.
+ */
+ public LazyMvccDataEntry(
+ GridCacheSharedContext cctx,
+ int cacheId,
+ byte keyType,
+ byte[] keyBytes,
+ byte valType,
+ byte[] valBytes,
+ GridCacheOperation op,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ long expireTime,
+ int partId,
+ long partCnt,
+ MvccVersion mvccVer
+ ) {
+ super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, mvccVer);
+
+ this.cctx = cctx;
+ this.keyType = keyType;
+ this.keyBytes = keyBytes;
+ this.valType = valType;
+ this.valBytes = valBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject key() {
+ try {
+ if (key == null) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx == null)
+ throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId);
+
+ IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects();
+
+ key = co.toKeyCacheObject(cacheCtx.cacheObjectContext(), keyType, keyBytes);
+
+ if (key.partition() == -1)
+ key.partition(partId);
+ }
+
+ return key;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject value() {
+ if (val == null && valBytes != null) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx == null)
+ throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId);
+
+ IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects();
+
+ val = co.toCacheObject(cacheCtx.cacheObjectContext(), valType, valBytes);
+ }
+
+ return val;
+ }
+
+ /** @return Data Entry Key type code. See {@link CacheObject} for built-in value type codes */
+ public byte getKeyType() {
+ return keyType;
+ }
+
+ /** @return Key value bytes. */
+ public byte[] getKeyBytes() {
+ return keyBytes;
+ }
+
+ /** @return Data Entry Value type code. See {@link CacheObject} for built-in value type codes */
+ public byte getValType() {
+ return valType;
+ }
+
+ /** @return Value value bytes. */
+ public byte[] getValBytes() {
+ return valBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
new file mode 100644
index 0000000..6480f6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents Data Entry ({@link #key}, {@link #val value}) pair for mvcc update {@link #op operation} in WAL log.
+ */
+public class MvccDataEntry extends DataEntry {
+ /** Entry version. */
+ private MvccVersion mvccVer;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @param val Value or null for delete operation.
+ * @param op Operation.
+ * @param nearXidVer Near transaction version.
+ * @param writeVer Write version.
+ * @param expireTime Expire time.
+ * @param partId Partition ID.
+ * @param partCnt Partition counter.
+ * @param mvccVer Mvcc version.
+ */
+ public MvccDataEntry(
+ int cacheId,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheOperation op,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ long expireTime,
+ int partId,
+ long partCnt,
+ MvccVersion mvccVer
+ ) {
+ super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+
+ this.mvccVer = mvccVer;
+ }
+
+ /**
+ * @return Mvcc version.
+ */
+ public MvccVersion mvccVer() {
+ return mvccVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccDataEntry.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
new file mode 100644
index 0000000..276ba1b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Logical data record with cache operation description.
+ * This record contains information about operation we want to do.
+ * Contains operation type (put, remove) and (Key, Value, Version) for each {@link MvccDataEntry}
+ */
+public class MvccDataRecord extends DataRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.MVCC_DATA_RECORD;
+ }
+
+ /**
+ * @param writeEntry Write entry.
+ */
+ public MvccDataRecord(MvccDataEntry writeEntry) {
+ this(writeEntry, U.currentTimeMillis());
+ }
+
+ /**
+ * @param writeEntries Write entries.
+ */
+ public MvccDataRecord(List<DataEntry> writeEntries) {
+ this(writeEntries, U.currentTimeMillis());
+ }
+
+ /**
+ * @param writeEntry Write entry.
+ */
+ public MvccDataRecord(MvccDataEntry writeEntry, long timestamp) {
+ this(Collections.singletonList(writeEntry), timestamp);
+ }
+
+ /**
+ * @param writeEntries Write entries.
+ * @param timestamp TimeStamp.
+ */
+ public MvccDataRecord(List<DataEntry> writeEntries, long timestamp) {
+ super(writeEntries, timestamp);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccDataRecord.class, this, "super", super.toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
new file mode 100644
index 0000000..82c4409
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Logical data record indented for MVCC transaction related actions.<br>
+ * This record is marker of prepare, commit, and rollback transactions.
+ */
+public class MvccTxRecord extends TxRecord {
+ /** Transaction mvcc snapshot version. */
+ private final MvccVersion mvccVer;
+
+ /**
+ * @param state Transaction state.
+ * @param nearXidVer Transaction id.
+ * @param writeVer Transaction entries write topology version.
+ * @param participatingNodes Primary -> Backup nodes compact IDs participating in transaction.
+ * @param mvccVer Transaction snapshot version.
+ */
+ public MvccTxRecord(
+ TransactionState state,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ @Nullable Map<Short, Collection<Short>> participatingNodes,
+ MvccVersion mvccVer
+ ) {
+ super(state, nearXidVer, writeVer, participatingNodes);
+
+ this.mvccVer = mvccVer;
+ }
+
+ /**
+ * @param state Transaction state.
+ * @param nearXidVer Transaction id.
+ * @param writeVer Transaction entries write topology version.
+ * @param mvccVer Transaction snapshot version.
+ * @param participatingNodes Primary -> Backup nodes participating in transaction.
+ * @param ts TimeStamp.
+ */
+ public MvccTxRecord(
+ TransactionState state,
+ GridCacheVersion nearXidVer,
+ GridCacheVersion writeVer,
+ @Nullable Map<Short, Collection<Short>> participatingNodes,
+ MvccVersion mvccVer,
+ long ts
+ ) {
+ super(state, nearXidVer, writeVer, participatingNodes, ts);
+
+ this.mvccVer = mvccVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.MVCC_TX_RECORD;
+ }
+
+ /**
+ * @return Mvcc version.
+ */
+ public MvccVersion mvccVersion() {
+ return mvccVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccTxRecord.class, this, "super", super.toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index 90df004..ae8d112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -32,14 +32,14 @@ import org.jetbrains.annotations.Nullable;
public class TxRecord extends TimeStampRecord {
/** Transaction state. */
@GridToStringInclude
- private TransactionState state;
+ private final TransactionState state;
/** Global transaction identifier within cluster, assigned by transaction coordinator. */
@GridToStringInclude
- private GridCacheVersion nearXidVer;
+ private final GridCacheVersion nearXidVer;
/** Transaction entries write topology version. */
- private GridCacheVersion writeVer;
+ private final GridCacheVersion writeVer;
/**
* Transaction participating nodes.
@@ -103,13 +103,6 @@ public class TxRecord extends TimeStampRecord {
}
/**
- * @param nearXidVer Near xid version.
- */
- public void nearXidVersion(GridCacheVersion nearXidVer) {
- this.nearXidVer = nearXidVer;
- }
-
- /**
* @return DHT version.
*/
public GridCacheVersion writeVersion() {
@@ -117,13 +110,6 @@ public class TxRecord extends TimeStampRecord {
}
/**
- * @param writeVer DHT version.
- */
- public void dhtVersion(GridCacheVersion writeVer) {
- this.writeVer = writeVer;
- }
-
- /**
* @return Transaction state.
*/
public TransactionState state() {
@@ -131,26 +117,12 @@ public class TxRecord extends TimeStampRecord {
}
/**
- * @param state Transaction state.
- */
- public void state(TransactionState state) {
- this.state = state;
- }
-
- /**
* @return Primary -> backup participating nodes compact IDs.
*/
public Map<Short, Collection<Short>> participatingNodes() {
return participatingNodes;
}
- /**
- * @param participatingNodeIds Primary -> backup participating nodes compact IDs.
- */
- public void participatingNodes(Map<Short, Collection<Short>> participatingNodeIds) {
- this.participatingNodes = participatingNodeIds;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TxRecord.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
new file mode 100644
index 0000000..25d7a6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Data Entry for automatic unwrapping key and value from Mvcc Data Entry
+ */
+public class UnwrapMvccDataEntry extends MvccDataEntry {
+ /** Cache object value context. Context is used for unwrapping objects. */
+ private final CacheObjectValueContext cacheObjValCtx;
+
+ /** Keep binary. This flag disables converting of non primitive types (BinaryObjects). */
+ private boolean keepBinary;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @param val Value or null for delete operation.
+ * @param op Operation.
+ * @param nearXidVer Near transaction version.
+ * @param writeVer Write version.
+ * @param expireTime Expire time.
+ * @param partId Partition ID.
+ * @param partCnt Partition counter.
+ * @param mvccVer Mvcc version.
+ * @param cacheObjValCtx cache object value context for unwrapping objects.
+ * @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead.
+ */
+ public UnwrapMvccDataEntry(
+ final int cacheId,
+ final KeyCacheObject key,
+ final CacheObject val,
+ final GridCacheOperation op,
+ final GridCacheVersion nearXidVer,
+ final GridCacheVersion writeVer,
+ final long expireTime,
+ final int partId,
+ final long partCnt,
+ MvccVersion mvccVer,
+ final CacheObjectValueContext cacheObjValCtx,
+ final boolean keepBinary) {
+ super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, mvccVer);
+
+ this.cacheObjValCtx = cacheObjValCtx;
+ this.keepBinary = keepBinary;
+ }
+
+ /**
+ * Unwraps key value from cache key object into primitive boxed type or source class. If client classes were used
+ * in key, call of this method requires classes to be available in classpath.
+ *
+ * @return Key which was placed into cache. Or null if failed to convert.
+ */
+ public Object unwrappedKey() {
+ try {
+ if (keepBinary && key instanceof BinaryObject)
+ return key;
+
+ Object unwrapped = key.value(cacheObjValCtx, false);
+
+ if (unwrapped instanceof BinaryObject) {
+ if (keepBinary)
+ return unwrapped;
+ unwrapped = ((BinaryObject)unwrapped).deserialize();
+ }
+
+ return unwrapped;
+ }
+ catch (Exception e) {
+ cacheObjValCtx.kernalContext().log(UnwrapMvccDataEntry.class)
+ .error("Unable to convert key [" + key + "]", e);
+
+ return null;
+ }
+ }
+
+ /**
+ * Unwraps value value from cache value object into primitive boxed type or source class. If client classes were
+ * used in key, call of this method requires classes to be available in classpath.
+ *
+ * @return Value which was placed into cache. Or null for delete operation or for failure.
+ */
+ public Object unwrappedValue() {
+ try {
+ if (val == null)
+ return null;
+
+ if (keepBinary && val instanceof BinaryObject)
+ return val;
+
+ return val.value(cacheObjValCtx, false);
+ }
+ catch (Exception e) {
+ cacheObjValCtx.kernalContext().log(UnwrapMvccDataEntry.class)
+ .error("Unable to convert value [" + value() + "]", e);
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass().getSimpleName() + "[k = " + unwrappedKey() + ", v = [ "
+ + unwrappedValue()
+ + "], super = ["
+ + super.toString() + "]]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index e22adc0..ad8a2a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -198,8 +198,14 @@ public abstract class WALRecord {
/** Encrypted WAL-record. */
ENCRYPTED_RECORD,
- /** Ecnrypted data record */
- ENCRYPTED_DATA_RECORD;
+ /** Ecnrypted data record. */
+ ENCRYPTED_DATA_RECORD,
+
+ /** Mvcc data record. */
+ MVCC_DATA_RECORD,
+
+ /** Mvcc Tx state change record. */
+ MVCC_TX_RECORD;
/** */
private static final RecordType[] VALS = RecordType.values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index cc0a78e..fa4cc98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.UnregisteredClassException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -1216,7 +1218,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
- logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+ logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
cctx.cacheId(),
key,
val,
@@ -1226,7 +1228,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
newVer,
expireTime,
key.partition(),
- 0L)));
+ 0L,
+ mvccVer)
+ ));
}
update(val, expireTime, ttl, newVer, true);
@@ -1343,7 +1347,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = logTxUpdate(tx, null, 0, 0L);
+ logPtr = logMvccUpdate(tx, null, 0, 0L, mvccVer);
update(null, 0, 0, newVer, true);
@@ -3433,17 +3437,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateCntr = nextPartitionCounter(topVer, true, null);
if (walEnabled) {
- cctx.shared().wal().log(new DataRecord(new DataEntry(
- cctx.cacheId(),
- key,
- val,
- val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
- null,
- ver,
- expireTime,
- partition(),
- updateCntr
- )));
+ if (cctx.mvccEnabled()) {
+ cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
+ cctx.cacheId(),
+ key,
+ val,
+ val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
+ null,
+ ver,
+ expireTime,
+ partition(),
+ updateCntr,
+ mvccVer
+ )));
+ } else {
+ cctx.shared().wal().log(new DataRecord(new DataEntry(
+ cctx.cacheId(),
+ key,
+ val,
+ val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
+ null,
+ ver,
+ expireTime,
+ partition(),
+ updateCntr
+ )));
+ }
}
drReplicate(drType, val, ver, topVer);
@@ -4297,7 +4316,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
*/
protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
throws IgniteCheckedException {
- assert cctx.transactional();
+ assert cctx.transactional() && !cctx.transactionalSnapshot();
if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
GridCacheOperation op;
@@ -4322,6 +4341,43 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * @param tx Transaction.
+ * @param val Value.
+ * @param expireTime Expire time (or 0 if not applicable). *
+ * @param updCntr Update counter.
+ * @param mvccVer Mvcc version.
+ * @throws IgniteCheckedException In case of log failure.
+ */
+ protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+ MvccSnapshot mvccVer)
+ throws IgniteCheckedException {
+ assert mvccVer != null;
+ assert cctx.transactionalSnapshot();
+
+ if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
+ GridCacheOperation op;
+ if (val == null)
+ op = GridCacheOperation.DELETE;
+ else
+ op = this.val == null ? GridCacheOperation.CREATE : GridCacheOperation.UPDATE;
+
+ return cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
+ cctx.cacheId(),
+ key,
+ val,
+ op,
+ tx.nearXidVersion(),
+ tx.writeVersion(),
+ expireTime,
+ key.partition(),
+ updCntr,
+ mvccVer)));
+ }
+ else
+ return null;
+ }
+
+ /**
* Removes value from offheap.
*
* @throws IgniteCheckedException If failed.
@@ -5159,16 +5215,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
- cctx.cacheId(),
- entry.key(),
- null,
- DELETE,
- tx.nearXidVersion(),
- tx.writeVersion(),
- 0,
- entry.key().partition(),
- 0)));
+ entry.logMvccUpdate(tx, null, 0, 0, mvccVer);
entry.update(null, 0, 0, newVer, true);
@@ -5510,7 +5557,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+ logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
cctx.cacheId(),
entry.key(),
val,
@@ -5520,7 +5567,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
newVer,
expireTime,
entry.key().partition(),
- 0L)));
+ 0L,
+ mvccVer)));
entry.update(val, expireTime, ttl, newVer, true);
@@ -6707,7 +6755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheVersion ver = tx.writeVersion();
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+ logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
cctx.cacheId(),
key,
val,
@@ -6716,7 +6764,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ver,
CU.EXPIRE_TIME_ETERNAL,
key.partition(),
- 0L)));
+ 0L,
+ mvccVer)));
update(val, expireTime, ttl, ver, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f4e5b1b..2e2fa47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1498,7 +1498,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
pluginMgr.validate();
- if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+ if (!recoveryMode && cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
sharedCtx.coordinators().ensureStarted();
sharedCtx.jta().registerCache(cfg);
@@ -2306,6 +2306,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheContext.finishRecovery(cacheStartVer, updatedDescriptor.cacheConfiguration().isStatisticsEnabled());
+ if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+ sharedCtx.coordinators().ensureStarted();
+
onKernalStart(cacheContext.cache());
if (log.isInfoEnabled())
@@ -5446,11 +5449,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
- // Skip MVCC caches.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-10052
- if (cacheDescriptor.cacheConfiguration().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
- continue;
-
startCacheInRecoveryMode(cacheDescriptor);
querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 66d927f..d311708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -23,8 +23,8 @@ import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -412,6 +412,25 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
* @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param part Partition.
+ * @param mvccVer Mvcc version.
+ * @throws IgniteCheckedException If failed.
+ */
+ void mvccApplyUpdate(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ GridDhtLocalPartition part,
+ MvccVersion mvccVer) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
* @param partId Partition number.
* @param part Partition.
* @throws IgniteCheckedException If failed.
@@ -923,6 +942,24 @@ public interface IgniteCacheOffheapManager {
public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
/**
+ *
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param mvccVer Mvcc version.
+ * @throws IgniteCheckedException
+ */
+ void mvccApplyUpdate(GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer
+ ) throws IgniteCheckedException;
+
+ /**
* @param cctx Cache context.
* @param key Key.
* @param partId Partition number.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index f00c784..5f467b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -51,6 +51,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -626,6 +628,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public void mvccApplyUpdate(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ GridDhtLocalPartition part,
+ MvccVersion mvccVer) throws IgniteCheckedException {
+
+ dataStore(part).mvccApplyUpdate(cctx,
+ key,
+ val,
+ ver,
+ expireTime,
+ mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public void remove(
GridCacheContext cctx,
KeyCacheObject key,
@@ -2512,6 +2532,92 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /** {@inheritDoc} */
+ @Override public void mvccApplyUpdate(GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer
+ ) throws IgniteCheckedException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+ CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+ // Make sure value bytes initialized.
+ key.valueBytes(coCtx);
+
+ if (val != null)
+ val.valueBytes(coCtx);
+
+ MvccSnapshotWithoutTxs mvccSnapshot = new MvccSnapshotWithoutTxs(mvccVer.coordinatorVersion(),
+ mvccVer.counter(), mvccVer.operationCounter(), MvccUtils.MVCC_COUNTER_NA);
+
+ MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
+ cctx,
+ key,
+ val,
+ ver,
+ partId,
+ 0L,
+ mvccSnapshot,
+ null,
+ null,
+ false,
+ false,
+ false,
+ false,
+ false,
+ false);
+
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
+ dataTree.visit(new MvccMaxSearchRow(cacheId, key), new MvccMinSearchRow(cacheId, key), updateRow);
+
+ ResultType res = updateRow.resultType();
+
+ assert res == ResultType.PREV_NULL || res == ResultType.PREV_NOT_NULL : res;
+
+ if (res == ResultType.PREV_NOT_NULL) {
+ CacheDataRow oldRow = updateRow.oldRow();
+
+ assert oldRow != null && oldRow.link() != 0 : oldRow;
+
+ rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot);
+ }
+
+ if (val != null) {
+ if (!grp.storeCacheIdInDataPage() && updateRow.cacheId() != CU.UNDEFINED_CACHE_ID) {
+ updateRow.cacheId(CU.UNDEFINED_CACHE_ID);
+
+ rowStore.addRow(updateRow);
+
+ updateRow.cacheId(cctx.cacheId());
+ }
+ else
+ rowStore.addRow(updateRow);
+
+ boolean old = dataTree.putx(updateRow);
+
+ assert !old;
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ if (qryMgr.enabled())
+ qryMgr.store(updateRow, null, true);
+
+ cleanup(cctx, updateRow.cleanupRows());
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* @param cctx Cache context.
* @param newRow New row.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 68669b7..cfa8eb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -181,9 +180,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
/** Batches already sent to remotes, but their acks are not received yet. */
private ConcurrentMap<UUID, ConcurrentMap<Integer, Batch>> pending;
- /** */
- private WALPointer walPtr;
-
/** Do not send DHT requests to near node. */
protected boolean skipNearNodeUpdates;
@@ -531,12 +527,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
}
if (!hasNext0()) {
- if (walPtr != null && !cctx.tm().logTxRecords()) {
- cctx.shared().wal().flush(walPtr, true);
-
- walPtr = null; // Avoid additional flushing.
- }
-
if (!F.isEmpty(batches)) {
// Flush incomplete batches.
// Need to skip batches for nodes where first request (contains tx info) is still in-flight.
@@ -629,11 +619,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
assert updRes != null && updRes.updateFuture() == null;
- WALPointer ptr0 = updRes.loggedPointer();
-
- if (ptr0 != null)
- walPtr = ptr0;
-
onEntryProcessed(entry.key(), updRes);
if (!updRes.success())
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 07fbb9b..851a5ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -288,10 +289,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void ensureStarted() throws IgniteCheckedException {
- if (!ctx.clientNode() && txLog == null) {
+ if (!ctx.clientNode()) {
assert mvccEnabled && mvccSupported;
- txLog = new TxLog(ctx, ctx.cache().context().database());
+ if (txLog == null)
+ txLog = new TxLog(ctx, ctx.cache().context().database());
startVacuumWorkers();
@@ -327,6 +329,15 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
txLogPageStoreInit(mgr);
+
+ boolean hasMvccCaches = ctx.cache().persistentCaches().stream()
+ .anyMatch(c -> c.cacheConfiguration().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT);
+
+ if (hasMvccCaches) {
+ txLog = new TxLog(ctx, mgr);
+
+ mvccEnabled = true;
+ }
}
/**
@@ -1260,15 +1271,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (log.isDebugEnabled())
log.debug("Vacuum completed. " + metrics);
- }
- catch (NodeStoppingException ignored) {
- if (log.isDebugEnabled())
- log.debug("Cannot complete vacuum (node is stopping).");
-
- metrics = new VacuumMetrics();
- }
- catch (Throwable e) {
- ex = new GridClosureException(e);
+ } catch (Throwable e) {
+ if (X.hasCause(e, NodeStoppingException.class)) {
+ if (log.isDebugEnabled())
+ log.debug("Cannot complete vacuum (node is stopping).");
+
+ metrics = new VacuumMetrics();
+ } else
+ ex = new GridClosureException(e);
}
res.onDone(metrics, ex);
@@ -2399,27 +2409,27 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
entry = cctx.cache().entryEx(key);
}
- cctx.shared().database().checkpointReadLock();
-
int cleaned = 0;
try {
- if (cleanupRows != null)
- cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+ cctx.shared().database().checkpointReadLock();
- if (rest != null) {
- if (rest.getClass() == ArrayList.class) {
- for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
- part.dataStore().updateTxState(cctx, row);
- }
+ try {
+ if (cleanupRows != null)
+ cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+
+ if (rest != null) {
+ if (rest.getClass() == ArrayList.class) {
+ for (MvccDataRow row : ((List<MvccDataRow>) rest)) {
+ part.dataStore().updateTxState(cctx, row);
+ }
+ } else
+ part.dataStore().updateTxState(cctx, (MvccDataRow) rest);
}
- else
- part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
+ } finally {
+ cctx.shared().database().checkpointReadUnlock();
}
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
-
+ } finally {
entry.unlockEntry();
cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 972d4d9..f29e23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -184,7 +184,14 @@ public class MvccUtils {
if ((mvccOpCntr & MVCC_HINTS_MASK) != 0)
return (byte)(mvccOpCntr >>> MVCC_HINTS_BIT_OFF);
- return proc.state(mvccCrd, mvccCntr);
+ byte state = proc.state(mvccCrd, mvccCntr);
+
+ if ((state == TxState.NA || state == TxState.PREPARED)
+ && (proc.currentCoordinator() == null // Recovery from WAL.
+ || mvccCrd < proc.currentCoordinator().coordinatorVersion()))
+ state = TxState.ABORTED;
+
+ return state;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index 03eb659..ca3053c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -236,18 +236,13 @@ public class TxLog implements DbCheckpointListener {
* @throws IgniteCheckedException If failed.
*/
public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException {
+ assert mgr.checkpointLockIsHeldByThread();
+
Sync sync = syncObject(key);
try {
- mgr.checkpointReadLock();
-
- try {
- synchronized (sync) {
- tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
- }
- }
- finally {
- mgr.checkpointReadUnlock();
+ synchronized (sync) {
+ tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
}
} finally {
evict(key, sync);
@@ -267,8 +262,14 @@ public class TxLog implements DbCheckpointListener {
tree.iterate(LOWEST, clo, clo);
if (clo.rows != null) {
- for (TxKey row : clo.rows) {
- remove(row);
+ mgr.checkpointReadLock();
+
+ try {
+ for (TxKey row : clo.rows)
+ remove(row);
+ }
+ finally {
+ mgr.checkpointReadUnlock();
}
}
}
@@ -278,17 +279,11 @@ public class TxLog implements DbCheckpointListener {
Sync sync = syncObject(key);
try {
- mgr.checkpointReadLock();
-
- try {
- synchronized (sync) {
- tree.removex(key);
- }
+ synchronized (sync) {
+ tree.removex(key);
}
- finally {
- mgr.checkpointReadUnlock();
- }
- } finally {
+ }
+ finally {
evict(key, sync);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 94ac100..393e137 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -95,6 +95,8 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
@@ -112,6 +114,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
@@ -157,6 +160,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
@@ -705,7 +709,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
metaStorage = createMetastorage(true);
- applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false);
+ applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false, true);
fillWalDisabledGroups();
@@ -2006,7 +2010,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
RestoreLogicalState logicalState = applyLogicalUpdates(
status,
g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g),
- true
+ true,
+ false
);
// Restore state for all groups.
@@ -2325,6 +2330,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
switch (rec.type()) {
+ case MVCC_DATA_RECORD:
case DATA_RECORD:
checkpointReadLock();
@@ -2361,6 +2367,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
+ case MVCC_TX_RECORD:
+ try {
+ MvccTxRecord txRecord = (MvccTxRecord)rec;
+
+ byte txState = convertToTxState(txRecord.state());
+
+ cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ break;
+
default:
// Skip other records.
}
@@ -2389,7 +2409,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private RestoreLogicalState applyLogicalUpdates(
CheckpointStatus status,
Predicate<Integer> cacheGroupsPredicate,
- boolean skipFieldLookup
+ boolean skipFieldLookup,
+ boolean metaStoreOnly
) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
@@ -2414,6 +2435,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
switch (rec.type()) {
+ case MVCC_DATA_RECORD:
case DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
@@ -2485,6 +2507,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
+ case MVCC_TX_RECORD:
+ if (metaStoreOnly)
+ continue;
+
+ MvccTxRecord txRecord = (MvccTxRecord)rec;
+
+ byte txState = convertToTxState(txRecord.state());
+
+ cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+
+ break;
+
default:
// Skip other records.
}
@@ -2503,6 +2537,28 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Convert {@link TransactionState} to Mvcc {@link TxState}.
+ *
+ * @param state TransactionState.
+ * @return TxState.
+ */
+ private byte convertToTxState(TransactionState state) {
+ switch (state) {
+ case PREPARED:
+ return TxState.PREPARED;
+
+ case COMMITTED:
+ return TxState.COMMITTED;
+
+ case ROLLED_BACK:
+ return TxState.ABORTED;
+
+ default:
+ throw new IllegalStateException("Unsupported TxState.");
+ }
+ }
+
+ /**
* Wal truncate callBack.
*
* @param highBound WALPointer.
@@ -2530,14 +2586,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
- cacheCtx.offheap().update(
- cacheCtx,
- dataEntry.key(),
- dataEntry.value(),
- dataEntry.writeVersion(),
- 0L,
- locPart,
- null);
+ if (dataEntry instanceof MvccDataEntry) {
+ cacheCtx.offheap().mvccApplyUpdate(
+ cacheCtx,
+ dataEntry.key(),
+ dataEntry.value(),
+ dataEntry.writeVersion(),
+ 0L,
+ locPart,
+ ((MvccDataEntry)dataEntry).mvccVer());
+ }
+ else {
+ cacheCtx.offheap().update(
+ cacheCtx,
+ dataEntry.key(),
+ dataEntry.value(),
+ dataEntry.writeVersion(),
+ 0L,
+ locPart,
+ null);
+ }
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
@@ -2545,7 +2613,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
case DELETE:
- cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
+ if (dataEntry instanceof MvccDataEntry) {
+ cacheCtx.offheap().mvccApplyUpdate(
+ cacheCtx,
+ dataEntry.key(),
+ null,
+ dataEntry.writeVersion(),
+ 0L,
+ locPart,
+ ((MvccDataEntry)dataEntry).mvccVer());
+ }
+ else
+ cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
@@ -4669,6 +4748,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
switch (rec.type()) {
case METASTORE_DATA_RECORD:
+ case MVCC_DATA_RECORD:
case DATA_RECORD:
if (skipDataRecords)
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5f6511d..48e86e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1929,6 +1929,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver,
+ long expireTime, MvccVersion mvccVer) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ delegate.mvccApplyUpdate(cctx, key, val, ver, expireTime, mvccVer);
+ }
+
+ /** {@inheritDoc} */
@Override public CacheDataRow createRow(
GridCacheContext cctx,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index be5f55a..e6191dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -31,7 +31,10 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrapMvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -43,13 +46,13 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRe
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
@@ -350,11 +353,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
}
/** {@inheritDoc} */
- @NotNull @Override protected WALRecord postProcessRecord(@NotNull final WALRecord rec) {
+ @Override protected @NotNull WALRecord postProcessRecord(@NotNull final WALRecord rec) {
GridKernalContext kernalCtx = sharedCtx.kernalContext();
IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
- if (processor != null && rec.type() == RecordType.DATA_RECORD) {
+ if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) {
try {
return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
}
@@ -409,7 +412,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
postProcessedEntries.add(postProcessedEntry);
}
- DataRecord res = new DataRecord(postProcessedEntries, dataRec.timestamp());
+ DataRecord res = dataRec instanceof MvccDataRecord ?
+ new MvccDataRecord(postProcessedEntries, dataRec.timestamp()) :
+ new DataRecord(postProcessedEntries, dataRec.timestamp());
res.size(dataRec.size());
res.position(dataRec.position());
@@ -426,7 +431,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
* @return post precessed entry
* @throws IgniteCheckedException if failed
*/
- @NotNull private DataEntry postProcessDataEntry(
+ private @NotNull DataEntry postProcessDataEntry(
final IgniteCacheObjectProcessor processor,
final CacheObjectContext fakeCacheObjCtx,
final DataEntry dataEntry) throws IgniteCheckedException {
@@ -457,18 +462,47 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
val = dataEntry.value();
}
- return new UnwrapDataEntry(
- dataEntry.cacheId(),
- key,
- val,
- dataEntry.op(),
- dataEntry.nearXidVersion(),
- dataEntry.writeVersion(),
- dataEntry.expireTime(),
- dataEntry.partitionId(),
- dataEntry.partitionCounter(),
- fakeCacheObjCtx,
- keepBinary || marshallerMappingFileStoreDir == null);
+ return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, marshallerMappingFileStoreDir);
+ }
+
+ /**
+ * Unwrap data entry.
+ * @param coCtx CacheObject context.
+ * @param dataEntry Data entry.
+ * @param key Entry key.
+ * @param val Entry value.
+ * @param marshallerMappingFileStoreDir Marshaller directory.
+ * @return Unwrapped entry.
+ */
+ private @NotNull DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
+ KeyCacheObject key, CacheObject val, File marshallerMappingFileStoreDir) {
+ if (dataEntry instanceof MvccDataEntry)
+ return new UnwrapMvccDataEntry(
+ dataEntry.cacheId(),
+ key,
+ val,
+ dataEntry.op(),
+ dataEntry.nearXidVersion(),
+ dataEntry.writeVersion(),
+ dataEntry.expireTime(),
+ dataEntry.partitionId(),
+ dataEntry.partitionCounter(),
+ ((MvccDataEntry)dataEntry).mvccVer(),
+ coCtx,
+ keepBinary || marshallerMappingFileStoreDir == null);
+ else
+ return new UnwrapDataEntry(
+ dataEntry.cacheId(),
+ key,
+ val,
+ dataEntry.op(),
+ dataEntry.nearXidVersion(),
+ dataEntry.writeVersion(),
+ dataEntry.expireTime(),
+ dataEntry.partitionId(),
+ dataEntry.partitionCounter(),
+ coCtx,
+ keepBinary || marshallerMappingFileStoreDir == null);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 0ad92e3..a097361 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
@@ -46,6 +45,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateTxStateHintRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
@@ -106,6 +105,7 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
@@ -122,8 +122,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
/** Length of HEADER record data. */
static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
- /** Cache shared context */
- private final GridCacheSharedContext cctx;
+ /** Cache shared context. */
+ protected final GridCacheSharedContext cctx;
/** Size of page used for PageMemory regions. */
private final int pageSize;
@@ -131,8 +131,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
/** Size of page without encryption overhead. */
private final int realPageSize;
- /** Cache object processor to reading {@link DataEntry DataEntries} */
- private final IgniteCacheObjectProcessor co;
+ /** Cache object processor to reading {@link DataEntry DataEntries}. */
+ protected final IgniteCacheObjectProcessor co;
/** Serializer of {@link TxRecord} records. */
private TxRecordSerializer txRecordSerializer;
@@ -1117,7 +1117,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
throw new EOFException("END OF SEGMENT");
case TX_RECORD:
- res = txRecordSerializer.read(in);
+ res = txRecordSerializer.readTx(in);
break;
@@ -1789,7 +1789,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @param ver Version to write.
* @param allowNull Is {@code null}version allowed.
*/
- private static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
+ static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
CacheVersionIO.write(buf, ver, allowNull);
}
@@ -1971,7 +1971,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @param allowNull Is {@code null}version allowed.
* @return Read cache version.
*/
- private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
+ GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
// To be able to read serialization protocol version.
in.ensure(1);
@@ -1992,7 +1992,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @return Full data record size.
* @throws IgniteCheckedException If failed to obtain the length of one of the entries.
*/
- private int dataSize(DataRecord dataRec) throws IgniteCheckedException {
+ protected int dataSize(DataRecord dataRec) throws IgniteCheckedException {
boolean encrypted = isDataRecordEncrypted(dataRec);
int sz = 0;
@@ -2018,7 +2018,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @return Entry size.
* @throws IgniteCheckedException If failed to get key or value bytes length.
*/
- private int entrySize(DataEntry entry) throws IgniteCheckedException {
+ protected int entrySize(DataEntry entry) throws IgniteCheckedException {
GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId());
CacheObjectContext coCtx = cctx.cacheObjectContext();
@@ -2058,7 +2058,11 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
return size;
}
+ /**
+ * Represents encrypted Data Entry ({@link #key}, {@link #val value}) pair.
+ */
public static class EncryptedDataEntry extends DataEntry {
+ /** Constructor. */
EncryptedDataEntry() {
super(0, null, null, READ, null, null, 0, 0, 0);
}