You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/30 12:24:33 UTC
[27/45] ignite git commit: IGNITE-4191: MVCC and transactional SQL
support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov,
Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov,
Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
new file mode 100644
index 0000000..c952a48
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Vacuum metrics reducer.
+ */
+public class VacuumMetricsReducer implements IgniteReducer<VacuumMetrics, VacuumMetrics> {
+ /** */
+ private static final long serialVersionUID = 7063457745963917386L;
+
+ /** */
+ private final VacuumMetrics m = new VacuumMetrics();
+
+ /** {@inheritDoc} */
+ @Override public boolean collect(@Nullable VacuumMetrics metrics) {
+ assert metrics != null;
+
+ m.addCleanupRowsCnt(metrics.cleanupRowsCount());
+ m.addScannedRowsCount(metrics.scannedRowsCount());
+ m.addSearchNanoTime(metrics.searchNanoTime());
+ m.addCleanupNanoTime(metrics.cleanupNanoTime());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public VacuumMetrics reduce() {
+ return m;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
new file mode 100644
index 0000000..9a0d9e2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Task for cleaning sing partition.
+ */
+public class VacuumTask extends GridFutureAdapter<VacuumMetrics> {
+ /** */
+ private final MvccSnapshot snapshot;
+
+ /** */
+ @GridToStringExclude
+ private final GridDhtLocalPartition part;
+
+ /**
+ * @param snapshot Snapshot.
+ * @param part Partition to cleanup.
+ */
+ VacuumTask(MvccSnapshot snapshot, GridDhtLocalPartition part) {
+ this.snapshot = snapshot;
+ this.part = part;
+ }
+
+ /**
+ * @return Snapshot.
+ */
+ public MvccSnapshot snapshot() {
+ return snapshot;
+ }
+
+ /**
+ * @return Partition to cleanup.
+ */
+ public GridDhtLocalPartition part() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VacuumTask.class, this, "partId", part.id());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
new file mode 100644
index 0000000..0156c53
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestQueryCntr implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long cntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccAckRequestQueryCntr() {
+ // No-op.
+ }
+
+ /**
+ * @param cntr Query counter.
+ */
+ public MvccAckRequestQueryCntr(long cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccAckRequestQueryCntr.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 140;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccAckRequestQueryCntr.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
new file mode 100644
index 0000000..7771f4d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestQueryId implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long qryTrackerId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccAckRequestQueryId() {
+ // No-op.
+ }
+
+ /**
+ * @param qryTrackerId Query tracker Id.
+ */
+ public MvccAckRequestQueryId(long qryTrackerId) {
+ this.qryTrackerId = qryTrackerId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Query tracker id.
+ */
+ public long queryTrackerId() {
+ return qryTrackerId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("qryTrackerId", qryTrackerId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ qryTrackerId = reader.readLong("qryTrackerId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccAckRequestQueryId.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 145;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccAckRequestQueryId.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
new file mode 100644
index 0000000..69dfd25
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
+
+/**
+ *
+ */
+public class MvccAckRequestTx implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+ /** */
+ private long futId;
+
+ /** */
+ private long txCntr;
+
+ /** */
+ private byte flags;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccAckRequestTx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction.
+ */
+ public MvccAckRequestTx(long futId, long txCntr) {
+ this.futId = futId;
+ this.txCntr = txCntr;
+ }
+
+ /**
+ * @return Query counter.
+ */
+ public long queryCounter() {
+ return MVCC_COUNTER_NA;
+ }
+
+ /**
+ * @return Query tracker id.
+ */
+ public long queryTrackerId() {
+ return MVCC_TRACKER_ID_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return {@code True} if response message is not needed.
+ */
+ public boolean skipResponse() {
+ return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param val {@code True} if response message is not needed.
+ */
+ public void skipResponse(boolean val) {
+ if (val)
+ flags |= SKIP_RESPONSE_FLAG_MASK;
+ else
+ flags &= ~SKIP_RESPONSE_FLAG_MASK;
+ }
+
+ /**
+ * @return Counter assigned tp transaction.
+ */
+ public long txCounter() {
+ return txCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("txCntr", txCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ txCntr = reader.readLong("txCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccAckRequestTx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 137;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccAckRequestTx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
new file mode 100644
index 0000000..99761c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestTxAndQueryCntr extends MvccAckRequestTx {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccAckRequestTxAndQueryCntr() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ public MvccAckRequestTxAndQueryCntr(long futId, long txCntr, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccAckRequestTxAndQueryCntr.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 146;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccAckRequestTxAndQueryCntr.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
new file mode 100644
index 0000000..89f09db
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestTxAndQueryId extends MvccAckRequestTx {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long qryTrackerId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccAckRequestTxAndQueryId() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryTrackerId Query tracker id.
+ */
+ public MvccAckRequestTxAndQueryId(long futId, long txCntr, long qryTrackerId) {
+ super(futId, txCntr);
+
+ this.qryTrackerId = qryTrackerId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long queryTrackerId() {
+ return qryTrackerId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryTrackerId", qryTrackerId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryTrackerId = reader.readLong("qryTrackerId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccAckRequestTxAndQueryId.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 147;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccAckRequestTxAndQueryId.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
new file mode 100644
index 0000000..4b78c24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccActiveQueriesMessage implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private GridLongList activeQrys;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccActiveQueriesMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param activeQrys Active queries.
+ */
+ public MvccActiveQueriesMessage(GridLongList activeQrys) {
+ this.activeQrys = activeQrys;
+ }
+
+ /**
+ * @return Active queries.
+ */
+ @Nullable public GridLongList activeQueries() {
+ return activeQrys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("activeQrys", activeQrys))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ activeQrys = reader.readMessage("activeQrys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccActiveQueriesMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 149;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccActiveQueriesMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
new file mode 100644
index 0000000..72e4c52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccFutureResponse implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long futId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccFutureResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ */
+ public MvccFutureResponse(long futId) {
+ this.futId = futId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccFutureResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 138;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccFutureResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
new file mode 100644
index 0000000..6d8b3c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * Common interface for all MVCC-related messages.
+ */
+public interface MvccMessage extends Message {
+ /**
+ * @return {@code True} if should wait for coordinator initialization.
+ */
+ public boolean waitForCoordinatorInit();
+
+ /**
+ * @return {@code True} if message should be processed from NIO thread.
+ */
+ public boolean processedFromNioThread();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
new file mode 100644
index 0000000..75d33a7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request to get MVCC snapshot for a query.
+ */
+public class MvccQuerySnapshotRequest implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long futId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccQuerySnapshotRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ */
+ public MvccQuerySnapshotRequest(long futId) {
+ this.futId = futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccQuerySnapshotRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 139;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccQuerySnapshotRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
new file mode 100644
index 0000000..196003c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccSnapshotResponse implements MvccMessage, MvccSnapshot, MvccLongList {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long futId;
+
+ /** */
+ private long crdVer;
+
+ /** */
+ private long cntr;
+
+ /** */
+ private int opCntr;
+
+ /** */
+ @GridDirectTransient
+ private int txsCnt;
+
+ /** */
+ private long[] txs;
+
+ /** */
+ private long cleanupVer;
+
+ /** */
+ @GridDirectTransient
+ private long tracking;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccSnapshotResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
+ * @param opCntr Operation counter.
+ * @param cleanupVer Cleanup version.
+ * @param tracking Tracking number.
+ */
+ public void init(long futId, long crdVer, long cntr, int opCntr, long cleanupVer, long tracking) {
+ this.futId = futId;
+ this.crdVer = crdVer;
+ this.cntr = cntr;
+ this.opCntr = opCntr;
+ this.cleanupVer = cleanupVer;
+ this.tracking = tracking;
+
+ if (txsCnt > 0 && txs.length > txsCnt) // truncate if necessary
+ txs = Arrays.copyOf(txs, txsCnt);
+ }
+
+ /**
+ * @param txId Transaction counter.
+ */
+ public void addTx(long txId) {
+ if (txs == null)
+ txs = new long[4];
+ else if (txs.length == txsCnt)
+ txs = Arrays.copyOf(txs, txs.length << 1);
+
+ txs[txsCnt++] = txId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return txsCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long get(int i) {
+ return txs[i];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(long val) {
+ for (int i = 0; i < txsCnt; i++) {
+ if (txs[i] == val)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return Tracking counter.
+ */
+ public long tracking() {
+ return tracking;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return false;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long cleanupVersion() {
+ return cleanupVer;
+ }
+
+ /** {@inheritDoc} */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int operationCounter() {
+ return opCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementOperationCounter() {
+ opCntr++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccLongList activeTransactions() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccSnapshot withoutActiveTransactions() {
+ if (txsCnt > 0)
+ return new MvccSnapshotWithoutTxs(crdVer, cntr, opCntr, cleanupVer);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long coordinatorVersion() {
+ return crdVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cleanupVer", cleanupVer))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("crdVer", crdVer))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeInt("opCntr", opCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLongArray("txs", txs))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cleanupVer = reader.readLong("cleanupVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ crdVer = reader.readLong("crdVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ opCntr = reader.readInt("opCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ txs = reader.readLongArray("txs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ txsCnt = txs != null ? txs.length : 0;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccSnapshotResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 141;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccSnapshotResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
new file mode 100644
index 0000000..cd30eb8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request to get MVCC snapshot for a new transaction.
+ */
+public class MvccTxSnapshotRequest implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long futId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccTxSnapshotRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ */
+ public MvccTxSnapshotRequest(long futId) {
+ this.futId = futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(MvccTxSnapshotRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 136;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccTxSnapshotRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
new file mode 100644
index 0000000..ae57507
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccWaitTxsRequest implements MvccMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long futId;
+
+ /** */
+ private GridLongList txs;
+
+ /**
+ *
+ */
+ public MvccWaitTxsRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txs Transactions to wait for.
+ */
+ public MvccWaitTxsRequest(long futId, GridLongList txs) {
+ assert txs != null && txs.size() > 0 : txs;
+
+ this.futId = futId;
+ this.txs = txs;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Transactions to wait for.
+ */
+ public GridLongList transactions() {
+ return txs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeMessage("txs", txs))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ txs = reader.readMessage("txs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccWaitTxsRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 142;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccWaitTxsRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
new file mode 100644
index 0000000..92aff7b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class TxKey {
+ /** */
+ private final long major;
+
+ /** */
+ private final long minor;
+
+ /**
+ * @param major Major version.
+ * @param minor Minor version
+ */
+ public TxKey(long major, long minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ /**
+ * @return Major version.
+ */
+ public long major() {
+ return major;
+ }
+
+ /**
+ * @return Minor version.
+ */
+ public long minor() {
+ return minor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || o.getClass() != TxKey.class) return false;
+
+ TxKey txKey = (TxKey) o;
+
+ return major == txKey.major && minor == txKey.minor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = (int) (major ^ (major >>> 32));
+ result = 31 * result + (int) (minor ^ (minor >>> 32));
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
new file mode 100644
index 0000000..905bfc4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
+
+/**
+ *
+ */
+public class TxLog implements DbCheckpointListener {
+ /** */
+ public static final String TX_LOG_CACHE_NAME = "TxLog";
+
+ /** */
+ public static final int TX_LOG_CACHE_ID = CU.cacheId(TX_LOG_CACHE_NAME);
+
+ /** */
+ private static final TxKey LOWEST = new TxKey(0, 0);
+
+ /** */
+ private final IgniteCacheDatabaseSharedManager mgr;
+
+ /** */
+ private ReuseListImpl reuseList;
+
+ /** */
+ private TxLogTree tree;
+
+ /** */
+ private ConcurrentMap<TxKey, Sync> keyMap = new ConcurrentHashMap<>();
+
+ /**
+ *
+ * @param ctx Kernal context.
+ * @param mgr Database shared manager.
+ */
+ public TxLog(GridKernalContext ctx, IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ this.mgr = mgr;
+
+ init(ctx);
+ }
+
+ /**
+ *
+ * @param ctx Kernal context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void init(GridKernalContext ctx) throws IgniteCheckedException {
+ if (CU.isPersistenceEnabled(ctx.config())) {
+ mgr.checkpointReadLock();
+
+ try {
+ IgniteWriteAheadLogManager wal = ctx.cache().context().wal();
+ PageMemoryEx pageMemory = (PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+
+ long partMetaId = pageMemory.partitionMetaPageId(TX_LOG_CACHE_ID, 0);
+ long partMetaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, partMetaId);
+
+ long treeRoot, reuseListRoot;
+
+ boolean isNew = false;
+
+ try {
+ long pageAddr = pageMemory.writeLock(TX_LOG_CACHE_ID, partMetaId, partMetaPage);
+
+ try {
+ if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
+ // Initialize new page.
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
+
+ io.initNewPage(pageAddr, partMetaId, pageMemory.pageSize());
+
+ treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA);
+ reuseListRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA);
+
+ assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
+ assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+
+ io.setTreeRoot(pageAddr, treeRoot);
+ io.setReuseListRoot(pageAddr, reuseListRoot);
+
+ if (PageHandler.isWalDeltaRecordNeeded(pageMemory, TX_LOG_CACHE_ID, partMetaId, partMetaPage, wal, null))
+ wal.log(new MetaPageInitRecord(
+ TX_LOG_CACHE_ID,
+ partMetaId,
+ io.getType(),
+ io.getVersion(),
+ treeRoot,
+ reuseListRoot
+ ));
+
+ isNew = true;
+ }
+ else {
+ PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
+
+ treeRoot = io.getTreeRoot(pageAddr);
+ reuseListRoot = io.getReuseListRoot(pageAddr);
+
+ assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA :
+ U.hexLong(treeRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID;
+ assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA :
+ U.hexLong(reuseListRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID;
+ }
+ }
+ finally {
+ pageMemory.writeUnlock(TX_LOG_CACHE_ID, partMetaId, partMetaPage, null, isNew);
+ }
+ }
+ finally {
+ pageMemory.releasePage(TX_LOG_CACHE_ID, partMetaId, partMetaPage);
+ }
+
+ reuseList = new ReuseListImpl(
+ TX_LOG_CACHE_ID,
+ TX_LOG_CACHE_NAME,
+ pageMemory,
+ wal,
+ reuseListRoot,
+ isNew);
+
+ tree = new TxLogTree(pageMemory, wal, treeRoot, reuseList, ctx.failure(), isNew);
+
+ ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this);
+ }
+ finally {
+ mgr.checkpointReadUnlock();
+ }
+ }
+ else {
+ PageMemory pageMemory = mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+ ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME);
+
+ long treeRoot;
+
+ if ((treeRoot = reuseList1.takeRecycledPage()) == 0L)
+ treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, FLAG_IDX);
+
+ tree = new TxLogTree(pageMemory, null, treeRoot, reuseList1, ctx.failure(), true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ reuseList.saveMetadata();
+ }
+
+ /**
+ *
+ * @param major Major version.
+ * @param minor Minor version.
+ * @return Transaction state for given version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public byte get(long major, long minor) throws IgniteCheckedException {
+ return get(new TxKey(major, minor));
+ }
+
+ /**
+ *
+ * @param key Transaction key.
+ * @return Transaction state for given version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public byte get(TxKey key) throws IgniteCheckedException {
+ TxRow row = tree.findOne(key);
+
+ return row == null ? TxState.NA : row.state();
+ }
+
+ /**
+ *
+ * @param key TxKey.
+ * @param state Transaction state for given version.
+ * @param primary Flag if this is a primary node.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException {
+ Sync sync = syncObject(key);
+
+ try {
+ mgr.checkpointReadLock();
+
+ try {
+ synchronized (sync) {
+ tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
+ }
+ }
+ finally {
+ mgr.checkpointReadUnlock();
+ }
+ } finally {
+ evict(key, sync);
+ }
+ }
+
+ /**
+ * Removes all records less or equals to the given version.
+ *
+ * @param major Major version.
+ * @param minor Minor version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void removeUntil(long major, long minor) throws IgniteCheckedException {
+ TraversingClosure clo = new TraversingClosure(major, minor);
+
+ tree.iterate(LOWEST, clo, clo);
+
+ if (clo.rows != null) {
+ for (TxKey row : clo.rows) {
+ remove(row);
+ }
+ }
+ }
+
+ /** */
+ private void remove(TxKey key) throws IgniteCheckedException {
+ Sync sync = syncObject(key);
+
+ try {
+ mgr.checkpointReadLock();
+
+ try {
+ synchronized (sync) {
+ tree.removex(key);
+ }
+ }
+ finally {
+ mgr.checkpointReadUnlock();
+ }
+ } finally {
+ evict(key, sync);
+ }
+ }
+
+ /** */
+ private Sync syncObject(TxKey key) {
+ Sync sync = keyMap.get(key);
+
+ while (true) {
+ if (sync == null) {
+ Sync old = keyMap.putIfAbsent(key, sync = new Sync());
+
+ if (old == null)
+ return sync;
+ else
+ sync = old;
+ }
+ else {
+ int cntr = sync.counter;
+
+ while (cntr > 0) {
+ if (sync.casCounter(cntr, cntr + 1))
+ return sync;
+
+ cntr = sync.counter;
+ }
+
+ sync = keyMap.get(key);
+ }
+ }
+ }
+
+ /** */
+ private void evict(TxKey key, Sync sync) {
+ assert sync != null;
+
+ int cntr = sync.counter;
+
+ while (true) {
+ assert cntr > 0;
+
+ if (!sync.casCounter(cntr, cntr - 1)) {
+ cntr = sync.counter;
+
+ continue;
+ }
+
+ if (cntr == 1) {
+ boolean removed = keyMap.remove(key, sync);
+
+ assert removed;
+ }
+
+ break;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TraversingClosure extends TxKey implements BPlusTree.TreeRowClosure<TxKey, TxRow> {
+ /** */
+ private List<TxKey> rows;
+
+ /**
+ *
+ * @param major Major version.
+ * @param minor Minor version.
+ */
+ TraversingClosure(long major, long minor) {
+ super(major, minor);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BPlusTree<TxKey, TxRow> tree, BPlusIO<TxKey> io, long pageAddr,
+ int idx) throws IgniteCheckedException {
+
+ if (rows == null)
+ rows = new ArrayList<>();
+
+ TxLogIO logIO = (TxLogIO)io;
+ int offset = io.offset(idx);
+
+ rows.add(new TxKey(logIO.getMajor(pageAddr, offset), logIO.getMinor(pageAddr, offset)));
+
+ return true;
+ }
+ }
+
+ /** */
+ private static class Sync {
+ /** */
+ private static final AtomicIntegerFieldUpdater<Sync> UPD = AtomicIntegerFieldUpdater.newUpdater(Sync.class, "counter");
+
+ /** */
+ volatile int counter = 1;
+
+ /** */
+ boolean casCounter(int old, int upd) {
+ return UPD.compareAndSet(this, old, upd);
+ }
+ }
+
+ /**
+ * TxLog update closure.
+ */
+ private static final class TxLogUpdateClosure implements IgniteTree.InvokeClosure<TxRow> {
+ /** */
+ private final long major;
+
+ /** */
+ private final long minor;
+
+ /** */
+ private final byte newState;
+
+ /** */
+ private final boolean primary;
+
+ /** */
+ private IgniteTree.OperationType treeOp;
+
+ /**
+ *
+ * @param major Coordinator version.
+ * @param minor Counter.
+ * @param newState New Tx newState.
+ * @param primary Flag if this is primary node.
+ */
+ TxLogUpdateClosure(long major, long minor, byte newState, boolean primary) {
+ assert major > MVCC_CRD_COUNTER_NA && minor > MVCC_COUNTER_NA && newState != TxState.NA;
+ this.major = major;
+ this.minor = minor;
+ this.newState = newState;
+ this.primary = primary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable TxRow row) {
+ if (row == null) {
+ valid();
+
+ return;
+ }
+
+ byte currState = row.state();
+
+ switch (currState) {
+ case TxState.NA:
+ checkNa(currState);
+
+ break;
+
+ case TxState.PREPARED:
+ checkPrepared(currState);
+
+ break;
+
+ case TxState.COMMITTED:
+ checkCommitted(currState);
+
+ break;
+
+ case TxState.ABORTED:
+ checkAborted(currState);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown tx state: " + currState);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public TxRow newRow() {
+ return treeOp == IgniteTree.OperationType.PUT ? new TxRow(major, minor, newState) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return treeOp;
+ }
+
+ /**
+ * Checks update possibility for {@code TxState.NA} tx status.
+ *
+ * @param currState Current tx state.
+ */
+ private void checkNa(byte currState) {
+ switch (newState) {
+ case TxState.ABORTED:
+ case TxState.PREPARED:
+ valid();
+
+ break;
+
+ case TxState.COMMITTED:
+ invalid(currState); // TODO IGNITE-8445
+
+ break;
+
+ default:
+ invalid(currState);
+ }
+ }
+
+ /**
+ * Checks update possibility for {@code TxState.PREPARED} status.
+ *
+ * @param currState Current tx state.
+ */
+ private void checkPrepared(byte currState) {
+ switch (newState) {
+ case TxState.ABORTED:
+ case TxState.COMMITTED:
+ valid();
+
+ break;
+
+ case TxState.PREPARED:
+ ignore();
+
+ break;
+
+ default:
+ invalid(currState);
+ }
+ }
+
+ /**
+ * Checks update possibility for {@code TxState.COMMITTED} status.
+ *
+ * @param currState Current tx state.
+ */
+ private void checkCommitted(byte currState) {
+ switch (newState) {
+ case TxState.COMMITTED:
+ ignore();
+
+ break;
+
+ case TxState.PREPARED:
+ if (primary)
+ ignore(); // In case when remote tx has updated the current state before.
+ else
+ invalid(currState);
+
+ break;
+
+ default:
+ invalid(currState);
+ }
+ }
+
+ /**
+ * Checks update possibility for {@code TxState.ABORTED} status.
+ *
+ * @param currState Current tx state.
+ */
+ private void checkAborted(byte currState) {
+ switch (newState) {
+ case TxState.ABORTED:
+ ignore();
+
+ break;
+
+ case TxState.PREPARED:
+ if (primary)
+ ignore(); // In case when remote tx has updated the current state before.
+ else
+ invalid(currState);
+
+ break;
+
+ default:
+ invalid(currState);
+ }
+ }
+
+ /**
+ * Action for valid tx status update.
+ */
+ private void valid() {
+ assert treeOp == null;
+
+ treeOp = IgniteTree.OperationType.PUT;
+ }
+
+ /**
+ * Action for invalid tx status update.
+ */
+ private void invalid(byte currState) {
+ assert treeOp == null;
+
+ throw new IllegalStateException("Unexpected new transaction state. [currState=" +
+ currState + ", newState=" + newState + ", cntr=" + minor +']');
+ }
+
+ /**
+ * Action for ignoring tx status update.
+ */
+ private void ignore() {
+ assert treeOp == null;
+
+ treeOp = IgniteTree.OperationType.NOOP;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
new file mode 100644
index 0000000..e952b43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+/**
+ *
+ */
+public interface TxLogIO {
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @param row Row to compare with.
+ * @return Comparision result.
+ */
+ int compare(long pageAddr, int off, TxKey row);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @return Major version
+ */
+ long getMajor(long pageAddr, int off);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @param major Major version
+ */
+ void setMajor(long pageAddr, int off, long major);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @return Minor version.
+ */
+ long getMinor(long pageAddr, int off);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @param minor Minor version.
+ */
+ void setMinor(long pageAddr, int off, long minor);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @return Transaction state.
+ */
+ byte getState(long pageAddr, int off);
+
+ /**
+ * @param pageAddr Page address.
+ * @param off Item offset.
+ * @param state Transaction state.
+ */
+ void setState(long pageAddr, int off, byte state);
+}