You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/30 15:07:01 UTC
[2/3] ignite git commit: ignite-3478
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java
new file mode 100644
index 0000000..397c408
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java
@@ -0,0 +1,1708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ *
+ */
+public class MvccTestApp2 {
+ /** */
+ private static final boolean DEBUG_LOG = false;
+
+ /** */
+ private static final boolean SQL = false;
+
+ public static void main1(String[] args) throws Exception {
+ final TestCluster cluster = new TestCluster(1);
+
+ final int ACCOUNTS = 3;
+
+ final int START_VAL = 10;
+
+ final Map<Object, Object> data = new TreeMap<>();
+
+ for (int i = 0; i < ACCOUNTS; i++)
+ data.put(i, START_VAL);
+
+ cluster.txPutAll(data);
+
+ cluster.txTransfer(0, 1, true);
+ cluster.txTransfer(0, 1, true);
+ cluster.txTransfer(0, 2, true);
+
+ Map<Object, Object> vals = cluster.sqlAll();
+
+ System.out.println();
+
+ Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet());
+
+ int sum = 0;
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Integer val = (Integer)getData.get(i);
+
+ sum += val;
+
+ System.out.println("Val: " + val);
+ }
+
+ System.out.println("Sum: " + sum);
+
+ cluster.cleanup();
+
+ getData = cluster.sqlAll();
+
+ System.out.println();
+//
+// MvccQueryVersion ver1 = cluster.crd.queryVersion();
+// MvccQueryVersion ver2 = cluster.crd.queryVersion();
+//
+// cluster.crd.queryDone(ver2.cntr);
+// cluster.crd.queryDone(ver1.cntr);
+ }
+
+ public static void main0(String[] args) throws Exception {
+ final TestCluster cluster = new TestCluster(1);
+
+ final int ACCOUNTS = 3;
+
+ final int START_VAL = 10;
+
+ final Map<Object, Object> data = new TreeMap<>();
+
+ for (int i = 0; i < ACCOUNTS; i++)
+ data.put(i, START_VAL);
+
+ cluster.txPutAll(data);
+
+ cluster.txRemoveTransfer(0, 1);
+
+ Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet());
+
+ int sum = 0;
+
+ for (Map.Entry<Object, Object> e : getData.entrySet()) {
+ Integer val = (Integer)e.getValue();
+
+ if (val != null)
+ sum += val;
+
+ System.out.println("Val: " + val);
+ }
+
+ System.out.println("Sum: " + sum);
+
+ cluster.cleanup();
+
+ getData = cluster.sqlAll();
+
+ System.out.println();
+//
+// MvccQueryVersion ver1 = cluster.crd.queryVersion();
+// MvccQueryVersion ver2 = cluster.crd.queryVersion();
+//
+// cluster.crd.queryDone(ver2.cntr);
+// cluster.crd.queryDone(ver1.cntr);
+ }
+
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean err = new AtomicBoolean();
+
+ final int READ_THREADS = 4;
+ final int UPDATE_THREADS = 4;
+ final int ACCOUNTS = 50;
+
+ final int START_VAL = 100000;
+
+ for (int iter = 0; iter < 1000; iter++) {
+ System.out.println("Iteration [readThreads=" + READ_THREADS +
+ ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS + ", iter=" + iter + ']');
+
+ final TestCluster cluster = new TestCluster(1);
+
+ final Map<Object, Object> data = new TreeMap<>();
+
+ for (int i = 0; i < ACCOUNTS; i++)
+ data.put(i, START_VAL);
+
+ cluster.txPutAll(data);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ List<Thread> threads = new ArrayList<>();
+
+ Thread cleanupThread = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("cleanup");
+
+ try {
+ while (!stop.get()) {
+ cluster.cleanup();
+
+ Thread.sleep(1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ threads.add(cleanupThread);
+
+ cleanupThread.start();
+
+ final boolean REMOVES = false;
+
+ for (int i = 0; i < READ_THREADS; i++) {
+ final int id = i;
+
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("read" + id);
+
+ int cnt = 0;
+
+ while (!stop.get()) {
+ Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet());
+
+ cnt++;
+
+ int sum = 0;
+
+ if (REMOVES) {
+ for (Map.Entry<Object, Object> e : qryData.entrySet()) {
+ Integer val = (Integer)e.getValue();
+
+ if (val != null)
+ sum += val;
+ else
+ System.out.println("With null");
+ }
+ }
+ else {
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Integer val = (Integer)qryData.get(i);
+
+ if (val == null) {
+ if (stop.compareAndSet(false, true)) {
+ stop.set(true);
+ err.set(true);
+
+ TestDebugLog.printAllAndExit("No value for key: " + i);
+ }
+
+ return;
+ }
+
+ sum += val;
+ }
+ }
+
+ if (sum != ACCOUNTS * START_VAL) {
+ if (stop.compareAndSet(false, true)) {
+ stop.set(true);
+ err.set(true);
+
+ TestDebugLog.printAllAndExit("Invalid get sum: " + sum);
+ }
+ }
+
+// if (cnt % 100 == 0)
+// System.out.println("get " + cnt);
+ }
+
+ System.out.println("Get cnt: " + cnt);
+ }
+ });
+
+ threads.add(thread);
+
+ thread.start();
+ }
+
+ for (int i = 0; i < UPDATE_THREADS; i++) {
+ final int id = i;
+
+ Thread thread;
+
+ if (REMOVES) {
+ thread = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("update" + id);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ int id1 = rnd.nextInt(ACCOUNTS);
+
+ int id2 = rnd.nextInt(ACCOUNTS);
+
+ while (id2 == id1)
+ id2 = rnd.nextInt(ACCOUNTS);
+
+ if (rnd.nextBoolean()) {
+ cluster.txRemoveTransfer(id1, id2);
+ }
+ else
+ cluster.txTransfer(id1, id2, rnd.nextBoolean());
+ }
+
+ }
+ });
+ }
+ else {
+ thread = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("update" + id);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ int id1 = rnd.nextInt(ACCOUNTS);
+
+ int id2 = rnd.nextInt(ACCOUNTS);
+
+ while (id2 == id1)
+ id2 = rnd.nextInt(ACCOUNTS);
+
+ if (id1 > id2) {
+ int tmp = id1;
+ id1 = id2;
+ id2 = tmp;
+ }
+
+ cluster.txTransfer(id1, id2, rnd.nextBoolean());
+ }
+
+ }
+ });
+ }
+
+ threads.add(thread);
+
+ thread.start();
+ }
+
+ long endTime = System.currentTimeMillis() + 2_000;
+
+ while (!stop.get()) {
+ Thread.sleep(1000);
+
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ //cluster.dumpMvccInfo();
+ }
+
+ stop.set(true);
+
+ for (Thread thread : threads)
+ thread.join();
+
+ Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet());
+
+ int sum = 0;
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Integer val = (Integer)qryData.get(i);
+
+ System.out.println("Val " + val);
+
+ if (val != null)
+ sum += val;
+ }
+
+ System.out.println("Sum=" + sum + ", expSum=" + (ACCOUNTS * START_VAL));
+
+ if (err.get()) {
+ System.out.println("Error!");
+
+ System.exit(1);
+ }
+
+// cluster.dumpMvccInfo();
+//
+// System.out.println("Cleanup");
+//
+// cluster.cleanup();
+//
+// cluster.dumpMvccInfo();
+
+ TestDebugLog.clear();
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestCluster {
+ /** */
+ final List<Node> nodes = new ArrayList<>();
+
+ /** */
+ final Coordinator crd;
+
+ /** */
+ final AtomicLong txIdGen = new AtomicLong(10_000);
+
+ TestCluster(int nodesNum) {
+ crd = new Coordinator();
+
+ for (int i = 0; i < nodesNum; i++)
+ nodes.add(new Node(i));
+ }
+
+ void cleanup() {
+ CoordinatorCounter cntr = crd.cleanupVersion();
+
+ for (Node node : nodes)
+ node.dataStore.cleanup(cntr);
+ }
+
+ void txPutAll(Map<Object, Object> data) {
+ TxId txId = new TxId(txIdGen.incrementAndGet());
+
+ Map<Object, Node> mappedEntries = new LinkedHashMap<>();
+
+ for (Object key : data.keySet()) {
+ int nodeIdx = nodeForKey(key);
+
+ Node node = nodes.get(nodeIdx);
+
+ node.dataStore.lockEntry(key);
+
+ mappedEntries.put(key, node);
+ }
+
+ CoordinatorCounter cntr = crd.nextTxCounter(txId);
+
+ MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.updateEntry(e.getKey(), data.get(e.getKey()), mvccVer);
+ }
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.unlockEntry(e.getKey());
+ }
+
+ crd.txDone(txId, cntr.cntr);
+ }
+
+ void txTransfer(Integer id1, Integer id2, boolean fromFirst) {
+ TreeSet<Integer> keys = new TreeSet<>();
+
+ keys.add(id1);
+ keys.add(id2);
+
+ TxId txId = new TxId(txIdGen.incrementAndGet());
+
+ Map<Object, Node> mappedEntries = new LinkedHashMap<>();
+
+ Map<Object, Object> vals = new HashMap<>();
+
+ for (Object key : keys) {
+ int nodeIdx = nodeForKey(key);
+
+ Node node = nodes.get(nodeIdx);
+
+ node.dataStore.lockEntry(key);
+
+ vals.put(key, node.dataStore.lastValue(key));
+
+ mappedEntries.put(key, node);
+ }
+
+ CoordinatorCounter cntr = crd.nextTxCounter(txId);
+
+ Integer curVal1 = (Integer)vals.get(id1);
+ Integer curVal2 = (Integer)vals.get(id2);
+
+ boolean update = false;
+
+ Integer newVal1 = null;
+ Integer newVal2 = null;
+
+ if (curVal1 != null && curVal2 != null) {
+ if (fromFirst) {
+ if (curVal1 > 0) {
+ update = true;
+
+ newVal1 = curVal1 - 1;
+ newVal2 = curVal2 + 1;
+ }
+ }
+ else {
+ if (curVal2 > 0) {
+ update = true;
+
+ newVal1 = curVal1 + 1;
+ newVal2 = curVal2 - 1;
+ }
+ }
+ }
+
+ if (update) {
+ Map<Object, Object> newVals = new HashMap<>();
+
+ newVals.put(id1, newVal1);
+ newVals.put(id2, newVal2);
+
+ MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
+
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg6("update", txId, id1, newVal1, id2, newVal2, cntr));
+ }
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.updateEntry(e.getKey(), newVals.get(e.getKey()), mvccVer);
+ }
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.unlockEntry(e.getKey());
+ }
+ }
+ else {
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.unlockEntry(e.getKey());
+ }
+ }
+
+ crd.txDone(txId, cntr.cntr);
+
+// if (DEBUG_LOG)
+// TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr));
+ }
+
+ void txRemoveTransfer(Integer from, Integer to) {
+ TreeSet<Integer> keys = new TreeSet<>();
+
+ keys.add(from);
+ keys.add(to);
+
+ TxId txId = new TxId(txIdGen.incrementAndGet());
+
+ Map<Object, Node> mappedEntries = new LinkedHashMap<>();
+
+ Map<Object, Object> vals = new HashMap<>();
+
+ for (Object key : keys) {
+ int nodeIdx = nodeForKey(key);
+
+ Node node = nodes.get(nodeIdx);
+
+ node.dataStore.lockEntry(key);
+
+ vals.put(key, node.dataStore.lastValue(key));
+
+ mappedEntries.put(key, node);
+ }
+
+ CoordinatorCounter cntr = crd.nextTxCounter(txId);
+
+ Integer fromVal = (Integer)vals.get(from);
+ Integer toVal = (Integer)vals.get(to);
+
+ boolean update = fromVal != null && toVal != null;
+
+ if (update) {
+ Map<Object, Object> newVals = new HashMap<>();
+
+ newVals.put(from, null);
+ newVals.put(to, fromVal + toVal);
+
+ MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
+
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg6("remove", txId, from, fromVal, to, toVal, cntr));
+ }
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.updateEntry(e.getKey(), newVals.get(e.getKey()), mvccVer);
+ }
+
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.unlockEntry(e.getKey());
+ }
+ }
+ else {
+ for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
+ Node node = e.getValue();
+
+ node.dataStore.unlockEntry(e.getKey());
+ }
+ }
+
+ crd.txDone(txId, cntr.cntr);
+
+ if (DEBUG_LOG)
+ TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr));
+ }
+
+ public void dumpMvccInfo() {
+ for (Node node : nodes) {
+ int sql = node.dataStore.mvccSqlIdx.size();
+
+ for (Map.Entry<Object, MvccValue> e : node.dataStore.mainIdx.entrySet()) {
+ List<MvccValue> list = node.dataStore.mvccIdx.get(e.getKey());
+
+ int size = 0;
+
+ if (list != null) {
+ synchronized (list) {
+ size = list.size();
+ }
+ }
+
+ System.out.println("Mvcc info [key=" + e.getKey() +
+ ", val=" + e.getValue() +
+ ", mvccVals=" + size +
+ ", sqlVals=" + sql + ']');
+ }
+ }
+ }
+
+ public Map<Object, Object> sqlAll() {
+ MvccQueryVersion qryVer = crd.queryVersion();
+
+ Map<Object, Object> res = new HashMap<>();
+
+ for (Node node : nodes) {
+ Map<Object, Object> nodeRes = node.dataStore.sqlQuery(qryVer);
+
+ res.putAll(nodeRes);
+ }
+
+ crd.queryDone(qryVer.cntr);
+
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("sqlAll", qryVer.cntr, qryVer.activeTxs, res));
+ }
+
+ return res;
+ }
+
+ public Map<Object, Object> getAll(Set<?> keys) {
+ MvccQueryVersion qryVer = crd.queryVersion();
+
+ Map<Object, Object> res = new HashMap<>();
+
+ for (Object key : keys) {
+ int nodeIdx = nodeForKey(key);
+
+ Node node = nodes.get(nodeIdx);
+
+ Object val = node.dataStore.get(key, qryVer);
+
+ res.put(key, val);
+ }
+
+ crd.queryDone(qryVer.cntr);
+
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", qryVer.cntr, qryVer.activeTxs, res));
+ }
+
+ return res;
+ }
+
+ private int nodeForKey(Object key) {
+ return U.safeAbs(key.hashCode()) % nodes.size();
+ }
+ }
+
+ /**
+ *
+ */
+ static class Node {
+ /** */
+ final DataStore dataStore;
+
+ /** */
+ final int nodexIdx;
+
+ public Node(int nodexIdx) {
+ this.nodexIdx = nodexIdx;
+
+ dataStore = new DataStore();
+ }
+
+ @Override public String toString() {
+ return "Node [idx=" + nodexIdx + ']';
+ }
+ }
+
+ /**
+ *
+ */
+ static class Coordinator {
+ /** */
+ private final AtomicLong cntr = new AtomicLong(-1);
+
+ /** */
+ private final GridAtomicLong commitCntr = new GridAtomicLong(-1);
+
+ /** */
+ private final ConcurrentHashMap8<Long, QueryCounter> activeQueries = new ConcurrentHashMap8<>();
+
+ /** */
+ @GridToStringInclude
+ private final ConcurrentHashMap8<TxId, Long> activeTxs = new ConcurrentHashMap8<>();
+
+ CoordinatorCounter nextTxCounter(TxId txId) {
+ long cur = cntr.get();
+
+ activeTxs.put(txId, cur + 1);
+
+ CoordinatorCounter newCtr = new CoordinatorCounter(cntr.incrementAndGet());
+
+ return newCtr;
+ }
+
+ void txDone(TxId txId, long cntr) {
+ Long rmvd = activeTxs.remove(txId);
+
+ assert rmvd != null;
+
+ commitCntr.setIfGreater(cntr);
+ }
+
+ private GridAtomicLong minActive0 = new GridAtomicLong(0);
+
+ private Long minActive(Set<TxId> txs) {
+ Long minActive = null;
+
+ for (Map.Entry<TxId, Long> e : activeTxs.entrySet()) {
+ if (txs != null)
+ txs.add(e.getKey());
+
+// TxId val = e.getValue();
+//
+// while (val.cntr == -1)
+// Thread.yield();
+
+ long cntr = e.getValue();
+
+ if (minActive == null)
+ minActive = cntr;
+ else if (cntr < minActive)
+ minActive = cntr;
+ }
+
+ if (minActive != null) {
+ if (!minActive0.setIfGreater(minActive))
+ return minActive0.get();
+ }
+
+ return minActive;
+ }
+
+ static class QueryCounter extends AtomicInteger {
+ public QueryCounter(int initialValue) {
+ super(initialValue);
+ }
+
+ boolean increment2() {
+ for (;;) {
+ int current = get();
+ int next = current + 1;
+
+ if (current == 0)
+ return false;
+
+ if (compareAndSet(current, next))
+ return true;
+ }
+ }
+ }
+
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ MvccQueryVersion queryVersion() {
+ rwLock.readLock().lock();
+
+ long useCntr = commitCntr.get();
+
+ Set<TxId> txs = new HashSet<>();
+
+ Long minActive = minActive(txs);
+
+ if (minActive != null && minActive < useCntr)
+ useCntr = minActive - 1;
+
+ MvccQueryVersion qryVer = new MvccQueryVersion(new CoordinatorCounter(useCntr), txs);
+
+ for (;;) {
+ QueryCounter qryCnt = activeQueries.get(useCntr);
+
+ if (qryCnt != null) {
+ boolean inc = qryCnt.increment2();
+
+ if (!inc) {
+ activeQueries.remove(useCntr, qryCnt);
+
+ continue;
+ }
+ }
+ else {
+ qryCnt = new QueryCounter(1);
+
+ if (activeQueries.putIfAbsent(useCntr, qryCnt) != null)
+ continue;
+ }
+
+ break;
+ }
+
+ rwLock.readLock().unlock();
+
+ return qryVer;
+ }
+
+ void queryDone(CoordinatorCounter cntr) {
+ AtomicInteger qryCnt = activeQueries.get(cntr.cntr);
+
+ assert qryCnt != null : cntr.cntr;
+
+ int left = qryCnt.decrementAndGet();
+
+ assert left >= 0 : left;
+
+ if (left == 0)
+ activeQueries.remove(cntr.cntr, qryCnt);
+ }
+
+ CoordinatorCounter cleanupVersion() {
+ rwLock.writeLock().lock();
+
+ long useCntr = commitCntr.get();
+
+ Long minActive = minActive(null);
+
+ if (minActive != null && minActive < useCntr)
+ useCntr = minActive - 1;
+
+ for (Long qryCntr : activeQueries.keySet()) {
+ if (qryCntr <= useCntr)
+ useCntr = qryCntr - 1;
+ }
+
+ rwLock.writeLock().unlock();
+
+ return new CoordinatorCounter(useCntr);
+ }
+
+ @Override public String toString() {
+ return S.toString(Coordinator.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class CoordinatorCounter implements Comparable<CoordinatorCounter> {
+ /** */
+ private final long topVer; // TODO
+
+ /** */
+ private final long cntr;
+
+ CoordinatorCounter(long cntr) {
+ this.topVer = 1;
+ this.cntr = cntr;
+ }
+
+ @Override public int compareTo(CoordinatorCounter o) {
+ return Long.compare(cntr, o.cntr);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ CoordinatorCounter that = (CoordinatorCounter)o;
+
+ return cntr == that.cntr;
+ }
+
+ @Override public int hashCode() {
+ return (int)(cntr ^ (cntr >>> 32));
+ }
+
+ @Override public String toString() {
+ return "Cntr [c=" + cntr + ']';
+ }
+ }
+
+ /**
+ *
+ */
+ static class MvccUpdateVersion {
+ /** */
+ @GridToStringInclude
+ final CoordinatorCounter cntr;
+
+ /** */
+ @GridToStringInclude
+ final TxId txId;
+
+ /**
+ * @param cntr
+ */
+ MvccUpdateVersion(CoordinatorCounter cntr, TxId txId) {
+ assert cntr != null;
+
+ this.cntr = cntr;
+ this.txId = txId;
+ }
+
+ @Override public String toString() {
+ return S.toString(MvccUpdateVersion.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MvccQueryVersion {
+ /** */
+ @GridToStringInclude
+ final CoordinatorCounter cntr;
+
+ /** */
+ @GridToStringInclude
+ final Collection<TxId> activeTxs;
+
+ MvccQueryVersion(CoordinatorCounter cntr, Collection<TxId> activeTxs) {
+ this.cntr = cntr;
+ this.activeTxs = activeTxs;
+ }
+
+ @Override public String toString() {
+ return S.toString(MvccQueryVersion.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class TxId {
+ /** */
+ @GridToStringInclude
+ final long id;
+
+ TxId(long id) {
+ this.id = id;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TxId txId = (TxId) o;
+
+ return id == txId.id;
+ }
+
+ @Override public int hashCode() {
+ return (int) (id ^ (id >>> 32));
+ }
+
+ @Override public String toString() {
+ return S.toString(TxId.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class SqlKey implements Comparable<SqlKey> {
+ /** */
+ final Comparable key;
+
+ /** */
+ final Comparable val;
+
+ /** */
+ final CoordinatorCounter cntr;
+
+ public SqlKey(Object key, Object val, CoordinatorCounter cntr) {
+ this.key = (Comparable)key;
+ this.val = (Comparable)val;
+ this.cntr = cntr;
+ }
+
+ @Override public int compareTo(@NotNull SqlKey o) {
+ int cmp;
+
+ if (val != null && o.val != null)
+ cmp = val.compareTo(o.val);
+ else {
+ if (val != null)
+ cmp = 1;
+ else
+ cmp = o.val == null ? 0 : -1;
+ }
+
+
+ if (cmp == 0) {
+ cmp = key.compareTo(o.key);
+
+ if (cmp == 0)
+ cmp = cntr.compareTo(o.cntr);
+ }
+
+ return cmp;
+ }
+
+ @Override public String toString() {
+ return "SqlKey [key=" + key + ", val=" + val + ']';
+ }
+ }
+
+ /**
+ *
+ */
+ static class DataStore {
+ /** */
+ private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>();
+
+ /** */
+ final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>();
+
+ /** */
+ final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>();
+
+ /** */
+ final ConcurrentSkipListMap<SqlKey, MvccSqlValue> mvccSqlIdx = new ConcurrentSkipListMap<>();
+
+ void cleanup(CoordinatorCounter cleanupCntr) {
+ for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) {
+ lockEntry(e.getKey());
+
+ try {
+ List<MvccValue> list = e.getValue();
+
+ synchronized (list) {
+ for (int i = list.size() - 1; i >= 0; i--) {
+ MvccValue val = list.get(i);
+
+ if (val.ver.cntr.compareTo(cleanupCntr) <= 0) {
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg6_1("cleanup",
+ e.getKey(), val.val, val.ver, cleanupCntr.cntr, null, null));
+ }
+
+ MvccValue prev;
+
+ if (val.val != null)
+ prev = mainIdx.put(e.getKey(), val);
+ else
+ prev = mainIdx.remove(e.getKey());
+
+ if (prev != null) {
+ SqlKey key = new SqlKey(e.getKey(), prev.val, prev.ver.cntr);
+
+ MvccSqlValue old = mvccSqlIdx.remove(key);
+
+ assert old != null;
+ }
+
+ for (int j = 0; j <= i; j++) {
+ MvccValue rmvd = list.remove(0);
+
+ assert rmvd != null;
+
+ if (j != i || rmvd.val == null) {
+ SqlKey key = new SqlKey(e.getKey(), rmvd.val, rmvd.ver.cntr);
+
+ MvccSqlValue old = mvccSqlIdx.remove(key);
+
+ assert old != null;
+ }
+ }
+
+ if (list.isEmpty())
+ mvccIdx.remove(e.getKey());
+
+ break;
+ }
+ }
+ }
+ }
+ finally {
+ unlockEntry(e.getKey());
+ }
+ }
+ }
+
+ void lockEntry(Object key) {
+ ReentrantLock e = lock(key);
+
+ e.lock();
+ }
+
+ void unlockEntry(Object key) {
+ ReentrantLock e = lock(key);
+
+ e.unlock();
+ }
+
+ void updateEntry(Object key, Object val, MvccUpdateVersion ver) {
+ List<MvccValue> list = mvccIdx.get(key);
+
+ if (list == null) {
+ Object old = mvccIdx.putIfAbsent(key, list = new ArrayList<>());
+
+ assert old == null;
+ }
+
+ MvccValue prevVal = null;
+
+ synchronized (list) {
+ if (!list.isEmpty())
+ prevVal = list.get(list.size() - 1);
+
+ list.add(new MvccValue(val, ver));
+ }
+
+ if (prevVal == null)
+ prevVal = mainIdx.get(key);
+
+ if (prevVal != null) {
+ SqlKey prevKey = new SqlKey(key, prevVal.val, prevVal.ver.cntr);
+
+ MvccSqlValue old =
+ mvccSqlIdx.put(prevKey, new MvccSqlValue(prevVal.val, prevVal.ver, ver));
+
+ assert old != null;
+ }
+
+ mvccSqlIdx.put(new SqlKey(key, val, ver.cntr), new MvccSqlValue(val, ver, null));
+ }
+
+ Object lastValue(Object key) {
+ List<MvccValue> list = mvccIdx.get(key);
+
+ if (list != null) {
+ synchronized (list) {
+ if (list.size() > 0)
+ return list.get(list.size() - 1).val;
+ }
+ }
+
+ MvccValue val = mainIdx.get(key);
+
+ return val != null ? val.val : null;
+ }
+
+ Map<Object, Object> sqlQuery(MvccQueryVersion qryVer) {
+ Map<Object, Object> res = new HashMap<>();
+
+ for (Map.Entry<SqlKey, MvccSqlValue> e : mvccSqlIdx.entrySet()) {
+ MvccSqlValue val = e.getValue();
+
+ if (!versionVisible(val.ver, qryVer)) {
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("sql skip mvcc val", e.getKey().key, val.val, val.ver));
+ }
+
+ continue;
+ }
+
+ MvccUpdateVersion newVer = val.newVer;
+
+ if (newVer != null && versionVisible(newVer, qryVer)) {
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql skip mvcc val2", e.getKey().key, val.val, val.ver, val.newVer));
+ }
+
+ continue;
+ }
+
+ Object old = res.put(e.getKey().key, e.getValue().val);
+
+ if (DEBUG_LOG) {
+ //TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql get mvcc val", e.getKey().key, val.val, val.ver, val.newVer));
+ }
+
+ if (old != null) {
+ TestDebugLog.printAllAndExit("Already has value for key [key=" + e.getKey().key +
+ ", qryVer=" + qryVer +
+ ", oldVal=" + old +
+ ", newVal=" + e.getValue().val +
+ ']');
+ }
+
+ assert old == null;
+ }
+
+ return res;
+ }
+
+ private boolean versionVisible(MvccUpdateVersion ver, MvccQueryVersion qryVer) {
+ int cmp = ver.cntr.compareTo(qryVer.cntr);
+
+ return cmp <= 0;// && !qryVer.activeTxs.contains(ver.txId);
+ }
+
+ Object get(Object key, MvccQueryVersion ver) {
+ List<MvccValue> list = mvccIdx.get(key);
+
+ if (list != null) {
+ synchronized (list) {
+ for (int i = list.size() - 1; i >= 0; i--) {
+ MvccValue val = list.get(i);
+
+ if (!versionVisible(val.ver, ver))
+ continue;
+
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read mvcc val", key, val.val, val.ver));
+ }
+
+ return val.val;
+ }
+ }
+ }
+
+ MvccValue val = mainIdx.get(key);
+
+ if (val != null) {
+ int cmp = val.ver.cntr.compareTo(ver.cntr);
+
+ if (DEBUG_LOG) {
+ if (cmp > 0) {
+ synchronized (TestDebugLog.msgs) {
+ TestDebugLog.msgs.add(new TestDebugLog.Message("Committed [key=" + key + ", ver=" + val.ver + ", qryVer=" + ver.cntr + ']'));
+
+ TestDebugLog.printAllAndExit("Committed [key=" + key + ", ver=" + val.ver + ", qryVer=" + ver + ']');
+ }
+ }
+ }
+
+ assert cmp <= 0 : "Committed [ver=" + val.ver + ", qryVer=" + ver.cntr + ']';
+
+ if (DEBUG_LOG)
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted val", key, val, val.ver));
+ }
+ else {
+ if (DEBUG_LOG)
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted null", key, null, null));
+ }
+
+ return val != null ? val.val : null;
+ }
+
+ private ReentrantLock lock(Object key) {
+ ReentrantLock e = locks.get(key);
+
+ if (e == null) {
+ ReentrantLock old = locks.putIfAbsent(key, e = new ReentrantLock());
+
+ if (old != null)
+ e = old;
+ }
+
+ return e;
+ }
+ }
+
+ /**
+ *
+ */
+ static class MvccValue {
+ /** */
+ @GridToStringInclude
+ final Object val;
+
+ /** */
+ @GridToStringInclude
+ final MvccUpdateVersion ver;
+
+ MvccValue(Object val, MvccUpdateVersion ver) {
+ assert ver != null;
+
+ this.val = val;
+ this.ver = ver;
+ }
+
+ @Override public String toString() {
+ return S.toString(MvccValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MvccSqlValue {
+ /** */
+ @GridToStringInclude
+ final Object val;
+
+ /** */
+ @GridToStringInclude
+ final MvccUpdateVersion ver;
+
+ /** */
+ @GridToStringInclude
+ final MvccUpdateVersion newVer;
+
+ MvccSqlValue(Object val, MvccUpdateVersion ver, MvccUpdateVersion newVer) {
+ assert ver != null;
+
+ this.val = val;
+ this.ver = ver;
+ this.newVer = newVer;
+ }
+
+ @Override public String toString() {
+ return S.toString(MvccSqlValue.class, this);
+ }
+ }
+
+ static void log(String msg) {
+ System.out.println(Thread.currentThread() + ": " + msg);
+ }
+
+ static class TestDebugLog {
+ /** */
+ //static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(1_000_000));
+ static final ConcurrentLinkedQueue<Object> msgs = new ConcurrentLinkedQueue<>();
+
+
+
+ /** */
+ private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS");
+
+ static class Message {
+ String thread = Thread.currentThread().getName();
+
+ String msg;
+
+ long ts = U.currentTimeMillis();
+
+ public Message(String msg) {
+ this.msg = msg;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class Msg2 extends Message{
+ Object v1;
+ Object v2;
+
+ public Msg2(String msg, Object v1, Object v2) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", v1=" + v1 +
+ ", v2=" + v2 +
+ ", msg=" + msg +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class Msg3 extends Message{
+ Object v1;
+ Object v2;
+ Object v3;
+
+ public Msg3(String msg, Object v1, Object v2, Object v3) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ this.v3 = v3;
+ }
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", v1=" + v1 +
+ ", v2=" + v2 +
+ ", v3=" + v3 +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class Msg4 extends Message{
+ Object v1;
+ Object v2;
+ Object v3;
+ Object v4;
+
+ public Msg4(String msg, Object v1, Object v2, Object v3, Object v4) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ this.v3 = v3;
+ this.v4 = v4;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", v1=" + v1 +
+ ", v2=" + v2 +
+ ", v3=" + v3 +
+ ", v4=" + v4 +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class Msg6 extends Message{
+ Object v1;
+ Object v2;
+ Object v3;
+ Object v4;
+ Object v5;
+ Object v6;
+
+ public Msg6(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ this.v3 = v3;
+ this.v4 = v4;
+ this.v5 = v5;
+ this.v6 = v6;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", txId=" + v1 +
+ ", id1=" + v2 +
+ ", v1=" + v3 +
+ ", id2=" + v4 +
+ ", v2=" + v5 +
+ ", cntr=" + v6 +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+ static class Msg6_1 extends Message{
+ Object v1;
+ Object v2;
+ Object v3;
+ Object v4;
+ Object v5;
+ Object v6;
+
+ public Msg6_1(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ this.v3 = v3;
+ this.v4 = v4;
+ this.v5 = v5;
+ this.v6 = v6;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", key=" + v1 +
+ ", val=" + v2 +
+ ", ver=" + v3 +
+ ", cleanupC=" + v4 +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class EntryMessage extends Message {
+ Object key;
+ Object val;
+
+ public EntryMessage(Object key, Object val, String msg) {
+ super(msg);
+
+ this.key = key;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class PartMessage extends Message {
+ int p;
+ Object val;
+
+ public PartMessage(int p, Object val, String msg) {
+ super(msg);
+
+ this.p = p;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "PartMessage [p=" + p + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static final boolean out = false;
+
+ public static void addMessage(String msg) {
+ msgs.add(new Message(msg));
+
+ if (out)
+ System.out.println(msg);
+ }
+
+ public static void addEntryMessage(Object key, Object val, String msg) {
+ if (key instanceof KeyCacheObject)
+ key = ((KeyCacheObject)key).value(null, false);
+
+ EntryMessage msg0 = new EntryMessage(key, val, msg);
+
+ msgs.add(msg0);
+
+ if (out) {
+ System.out.println(msg0.toString());
+
+ System.out.flush();
+ }
+ }
+
+ public static void addPartMessage(int p, Object val, String msg) {
+ PartMessage msg0 = new PartMessage(p, val, msg);
+
+ msgs.add(msg0);
+
+ if (out) {
+ System.out.println(msg0.toString());
+
+ System.out.flush();
+ }
+ }
+
+ static void printAllAndExit(String msg) {
+ System.out.println(msg);
+
+ TestDebugLog.addMessage(msg);
+
+ List<Object> msgs = TestDebugLog.printMessages(true, null);
+
+ TestDebugLog.printMessages0(msgs, "test_debug_update.txt");
+
+ TestDebugLog.printMessagesForThread(msgs, Thread.currentThread().getName(), "test_debug_thread.txt");
+
+ System.exit(1);
+ }
+
+ public static void printMessagesForThread(List<Object> msgs0, String thread0, String file) {
+ try {
+ FileOutputStream out = new FileOutputStream(file);
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (msg instanceof Message) {
+ String thread = ((Message) msg).thread;
+
+ if (thread.equals(thread0))
+ w.println(msg.toString());
+ }
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void printMessages0(List<Object> msgs0, String file) {
+ try {
+ FileOutputStream out = new FileOutputStream(file);
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (msg instanceof Message) {
+ String msg0 = ((Message) msg).msg;
+
+ if (msg0.equals("tx done") || msg0.equals("update") || msg0.equals("cleanup"))
+ w.println(msg.toString());
+ }
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static List<Object> printMessages(boolean file, Integer part) {
+ List<Object> msgs0;
+
+ synchronized (msgs) {
+ msgs0 = new ArrayList<>(msgs);
+
+ msgs.clear();
+ }
+
+ if (file) {
+ try {
+ FileOutputStream out = new FileOutputStream("test_debug.log");
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (part != null && msg instanceof PartMessage) {
+ if (((PartMessage) msg).p != part)
+ continue;
+ }
+
+ w.println(msg.toString());
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ for (Object msg : msgs0)
+ System.out.println(msg);
+ }
+
+ return msgs0;
+ }
+
+ public static void printKeyMessages(boolean file, Object key) {
+ List<Object> msgs0;
+
+ synchronized (msgs) {
+ msgs0 = new ArrayList<>(msgs);
+
+ msgs.clear();
+ }
+
+ if (file) {
+ try {
+ FileOutputStream out = new FileOutputStream("test_debug.log");
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+ continue;
+
+ w.println(msg.toString());
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ for (Object msg : msgs0) {
+ if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+ continue;
+
+ System.out.println(msg);
+ }
+ }
+ }
+
+ public static void clear() {
+ msgs.clear();
+ }
+
+ public static void clearEntries() {
+ for (Iterator it = msgs.iterator(); it.hasNext();) {
+ Object msg = it.next();
+
+ if (msg instanceof EntryMessage)
+ it.remove();
+ }
+ }
+
+ }}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 97e06bf..eae435e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -102,6 +102,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorMvccCounterResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryCounterRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -875,6 +881,36 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 129:
+ msg = new CoordinatorTxCounterRequest();
+
+ break;
+
+ case 130:
+ msg = new CoordinatorMvccCounterResponse();
+
+ break;
+
+ case 131:
+ msg = new CoordinatorTxAckRequest();
+
+ break;
+
+ case 132:
+ msg = new CoordinatorTxAckResponse();
+
+ break;
+
+ case 133:
+ msg = new CoordinatorQueryCounterRequest();
+
+ break;
+
+ case 134:
+ msg = new CoordinatorQueryAckRequest();
+
+ break;
+
// [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 5e2c8db..19bd05d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -237,6 +237,9 @@ class ClusterCachesInfo {
CU.checkAttributeMismatch(log, rmtAttr.groupName(), rmt, "groupName", "Cache group name",
locAttr.groupName(), rmtAttr.groupName(), true);
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "mvccEnabled", "MVCC mode",
+ locAttr.mvccEnabled(), rmtAttr.mvccEnabled(), true);
+
if (rmtAttr.cacheMode() != LOCAL) {
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
index d64ee8b..c1f03fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
@@ -329,6 +329,13 @@ public class GridCacheAttributes implements Serializable {
}
/**
+ * @return MVCC enabled flag.
+ */
+ public boolean mvccEnabled() {
+ return ccfg.isMvccEnabled();
+ }
+
+ /**
* @param obj Object to get class of.
* @return Class name or {@code null}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index b6faf47..b504625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2033,6 +2033,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return {@code True} if mvcc is enabled for cache.
+ */
+ public boolean mvccEnabled() {
+ return cacheCfg.isMvccEnabled();
+ }
+
+ /**
* @param part Partition.
* @param affNodes Affinity nodes.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bd950fa..030f7e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
@@ -456,10 +457,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.igfsHelper().validateCacheConfiguration(cc);
- if (cc.getAtomicityMode() == ATOMIC)
+ if (cc.getAtomicityMode() == ATOMIC) {
assertParameter(cc.getTransactionManagerLookupClassName() == null,
"transaction manager can not be used with ATOMIC cache");
+ assertParameter(!cc.isMvccEnabled(), "MVCC can not used with ATOMIC cache");
+ }
+
+ if (cc.getCacheMode() == LOCAL)
+ assertParameter(!cc.isMvccEnabled(), "MVCC can not used with LOCAL cache");
+
if (cc.getEvictionPolicy() != null && !cc.isOnheapCacheEnabled())
throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName="
+ U.maskName(cc.getName()) + "]");
@@ -2170,6 +2177,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@SuppressWarnings("unchecked")
private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException {
+ CacheCoordinatorsSharedManager coord = new CacheCoordinatorsSharedManager();
IgniteTxManager tm = new IgniteTxManager();
GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -2208,6 +2216,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return new GridCacheSharedContext(
kernalCtx,
+ coord,
tm,
verMgr,
mvccMgr,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 82d960a..09c8b1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -122,6 +123,9 @@ public class GridCacheSharedContext<K, V> {
/** Ttl cleanup manager. */
private GridCacheSharedTtlCleanupManager ttlMgr;
+ /** Cache mvcc coordinator. */
+ private CacheCoordinatorsSharedManager coord;
+
/** Cache contexts map. */
private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap;
@@ -163,6 +167,7 @@ public class GridCacheSharedContext<K, V> {
/**
* @param kernalCtx Context.
+ * @param coord Cache mvcc coordinator manager.
* @param txMgr Transaction manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
@@ -176,6 +181,7 @@ public class GridCacheSharedContext<K, V> {
*/
public GridCacheSharedContext(
GridKernalContext kernalCtx,
+ CacheCoordinatorsSharedManager coord,
IgniteTxManager txMgr,
GridCacheVersionManager verMgr,
GridCacheMvccManager mvccMgr,
@@ -193,7 +199,21 @@ public class GridCacheSharedContext<K, V> {
) {
this.kernalCtx = kernalCtx;
- setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, pageStoreMgr, walMgr, dbMgr, snpMgr, depMgr, exchMgr, affMgr, ioMgr, ttlMgr);
+ setManagers(mgrs,
+ coord,
+ txMgr,
+ jtaMgr,
+ verMgr,
+ mvccMgr,
+ pageStoreMgr,
+ walMgr,
+ dbMgr,
+ snpMgr,
+ depMgr,
+ exchMgr,
+ affMgr,
+ ioMgr,
+ ttlMgr);
this.storeSesLsnrs = storeSesLsnrs;
@@ -335,7 +355,9 @@ public class GridCacheSharedContext<K, V> {
void onReconnected(boolean active) throws IgniteCheckedException {
List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
- setManagers(mgrs, txMgr,
+ setManagers(mgrs,
+ coord,
+ txMgr,
jtaMgr,
verMgr,
mvccMgr,
@@ -374,6 +396,7 @@ public class GridCacheSharedContext<K, V> {
/**
* @param mgrs Managers list.
+ * @param coord Cache mvcc coordinator manager.
* @param txMgr Transaction manager.
* @param jtaMgr JTA manager.
* @param verMgr Version manager.
@@ -385,6 +408,7 @@ public class GridCacheSharedContext<K, V> {
* @param ttlMgr Ttl cleanup manager.
*/
private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
+ CacheCoordinatorsSharedManager coord,
IgniteTxManager txMgr,
CacheJtaManagerAdapter jtaMgr,
GridCacheVersionManager verMgr,
@@ -398,6 +422,7 @@ public class GridCacheSharedContext<K, V> {
CacheAffinitySharedManager affMgr,
GridCacheIoManager ioMgr,
GridCacheSharedTtlCleanupManager ttlMgr) {
+ this.coord = add(mgrs, coord);
this.mvccMgr = add(mgrs, mvccMgr);
this.verMgr = add(mgrs, verMgr);
this.txMgr = add(mgrs, txMgr);
@@ -738,6 +763,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Cache mvcc coordinator manager.
+ */
+ public CacheCoordinatorsSharedManager coordinators() {
+ return coord;
+ }
+
+ /**
* @return Node ID.
*/
public UUID localNodeId() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
new file mode 100644
index 0000000..e5d07ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ *
+ */
+public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+ /** */
+ private final AtomicLong mvccCntr = new AtomicLong(0L);
+
+ /** */
+ private final AtomicLong committedCntr = new AtomicLong(0L);
+
+ /** */
+ private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentMap<Long, MvccCounterFuture> cntrFuts = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>();
+
+ /** */
+ private final AtomicLong futIdCntr = new AtomicLong();
+
+ /** {@inheritDoc} */
+ @Override protected void start0() throws IgniteCheckedException {
+ super.start0();
+
+ cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(),
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
+ }
+
+ /**
+ * @param crd Coordinator.
+ * @param txId Transaction ID.
+ * @return Counter request future.
+ */
+ public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, GridCacheVersion txId) {
+ MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+
+ cntrFuts.put(fut.id, fut);
+
+ try {
+ cctx.gridIO().sendToGridTopic(crd,
+ TOPIC_CACHE_COORDINATOR,
+ new CoordinatorTxCounterRequest(fut.id, txId),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (cntrFuts.remove(fut.id) != null)
+ fut.onDone(e);
+ }
+
+ return fut;
+ }
+
+ public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) {
+ MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+
+ cntrFuts.put(fut.id, fut);
+
+ try {
+ cctx.gridIO().sendToGridTopic(crd,
+ TOPIC_CACHE_COORDINATOR,
+ new CoordinatorQueryCounterRequest(fut.id),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (cntrFuts.remove(fut.id) != null)
+ fut.onDone(e);
+ }
+
+ return fut;
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @return Acknowledge future.
+ */
+ public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, GridCacheVersion txId) {
+ TxAckFuture fut = new TxAckFuture(futIdCntr.incrementAndGet(), crd);
+
+ ackFuts.put(fut.id, fut);
+
+ try {
+ cctx.gridIO().sendToGridTopic(crd,
+ TOPIC_CACHE_COORDINATOR,
+ new CoordinatorTxAckRequest(fut.id, txId),
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (cntrFuts.remove(fut.id) != null)
+ fut.onDone();
+ }
+ catch (IgniteCheckedException e) {
+ if (cntrFuts.remove(fut.id) != null)
+ fut.onDone(e);
+ }
+
+ return fut;
+ }
+
+ public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) {
+ CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId);
+
+ msg.skipResponse(true);
+
+ try {
+ cctx.gridIO().sendToGridTopic(crd,
+ TOPIC_CACHE_COORDINATOR,
+ msg,
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.id() + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.id() + ']', e);
+ }
+ }
+
+
+ /**
+ * @param txId Transaction ID.
+ * @return Counter.
+ */
+ private long assignTxCounter(GridCacheVersion txId) {
+ long nextCtr = mvccCntr.incrementAndGet();
+
+ Object old = activeTxs.put(txId, nextCtr);
+
+ assert old == null : txId;
+
+ return nextCtr;
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
+
+ return;
+ }
+
+ long nextCtr = assignTxCounter(msg.txId());
+
+ try {
+ cctx.gridIO().sendToGridTopic(node,
+ TOPIC_CACHE_COORDINATOR,
+ new CoordinatorMvccCounterResponse(nextCtr, msg.futureId()),
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e);
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorCounterResponse(UUID nodeId, CoordinatorMvccCounterResponse msg) {
+ MvccCounterFuture fut = cntrFuts.remove(msg.futureId());
+
+ if (fut != null)
+ fut.onResponse(msg.counter());
+ else {
+ if (cctx.discovery().alive(nodeId))
+ U.warn(log, "Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Failed to find query counter future [node=" + nodeId + ", msg=" + msg + ']');
+ }
+ }
+ /**
+ *
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorQueryStateRequest(UUID nodeId, CoordinatorQueryCounterRequest msg) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
+
+ return;
+ }
+
+ long qryCntr = assignQueryCounter(nodeId);
+
+ CoordinatorMvccCounterResponse res = new CoordinatorMvccCounterResponse(msg.futureId(), qryCntr);
+
+ try {
+ cctx.gridIO().sendToGridTopic(node,
+ TOPIC_CACHE_COORDINATOR,
+ res,
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
+
+ onQueryDone(qryCntr);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
+
+ onQueryDone(qryCntr);
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) {
+ onQueryDone(msg.counter());
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+ activeTxs.remove(msg.txId());
+
+ if (!msg.skipResponse()) {
+ try {
+ cctx.gridIO().sendToGridTopic(nodeId,
+ TOPIC_CACHE_COORDINATOR,
+ new CoordinatorTxAckResponse(msg.futureId()),
+ SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorTxAckResponse msg) {
+ TxAckFuture fut = ackFuts.get(msg.futureId());
+
+ if (fut != null)
+ fut.onResponse();
+ else {
+ if (cctx.discovery().alive(nodeId))
+ U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
+ }
+ }
+
+ /**
+ * @param qryNodeId Node initiated query.
+ * @return Counter for query.
+ */
+ private long assignQueryCounter(UUID qryNodeId) {
+ // TODO IGNITE-3478
+ return committedCntr.get();
+ }
+
+ /**
+ * @param cntr Query counter.
+ */
+ private void onQueryDone(long cntr) {
+ // TODO IGNITE-3478
+ }
+
+ /**
+ * @param discoCache Cluster topology.
+ * @return Assigned coordinator.
+ */
+ @Nullable public ClusterNode assignCoordinator(DiscoCache discoCache) {
+ // TODO IGNITE-3478
+ List<ClusterNode> srvNodes = discoCache.serverNodes();
+
+ return srvNodes.isEmpty() ? null : srvNodes.get(0);
+ }
+
+ /**
+ *
+ */
+ private class MvccCounterFuture extends GridFutureAdapter<Long> {
+ /** */
+ private final Long id;
+
+ /** */
+ private final ClusterNode crd;
+
+ /**
+ * @param id Future ID.
+ * @param crd Coordinator.
+ */
+ MvccCounterFuture(Long id, ClusterNode crd) {
+ this.id = id;
+ this.crd = crd;
+ }
+
+ /**
+ * @param cntr Counter.
+ */
+ void onResponse(long cntr) {
+ onDone(cntr);
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
+ onDone(new ClusterTopologyCheckedException("Failed to request counter, node failed: " + nodeId));
+ }
+ }
+
+ /**
+ *
+ */
+ private class TxAckFuture extends GridFutureAdapter<Void> {
+ /** */
+ private final Long id;
+
+ /** */
+ private final ClusterNode crd;
+
+ /**
+ * @param id Future ID.
+ * @param crd Coordinator.
+ */
+ TxAckFuture(Long id, ClusterNode crd) {
+ this.id = id;
+ this.crd = crd;
+ }
+
+ /**
+ *
+ */
+ void onResponse() {
+ onDone();
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
+ onDone();
+ }
+ }
+
+ /**
+ *
+ */
+ private class CacheCoordinatorDiscoveryListener implements GridLocalEventListener {
+ /** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent : evt;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ for (MvccCounterFuture fut : cntrFuts.values())
+ fut.onNodeLeft(nodeId);
+
+// for (AckFuture fut : ackFuts.values())
+// fut.onNodeLeft(nodeId);
+//
+ }
+ }
+ /**
+ *
+ */
+ private class CoordinatorMessageListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+ if (msg instanceof CoordinatorTxCounterRequest)
+ processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
+ else if (msg instanceof CoordinatorMvccCounterResponse)
+ processCoordinatorCounterResponse(nodeId, (CoordinatorMvccCounterResponse)msg);
+ else if (msg instanceof CoordinatorTxAckRequest)
+ processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+ else if (msg instanceof CoordinatorTxAckResponse)
+ processCoordinatorTxAckResponse(nodeId, (CoordinatorTxAckResponse)msg);
+ else if (msg instanceof CoordinatorQueryAckRequest)
+ processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
+ else if (msg instanceof CoordinatorQueryCounterRequest)
+ processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
+ else
+ U.warn(log, "Unexpected message received: " + msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
new file mode 100644
index 0000000..5005477
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorMvccCounterResponse implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long cntr;
+
+ /** */
+ private long futId;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorMvccCounterResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cntr Counter.
+ * @param futId Future ID.
+ */
+ CoordinatorMvccCounterResponse(long cntr, long futId) {
+ this.cntr = cntr;
+ this.futId = futId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorMvccCounterResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 130;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorMvccCounterResponse.class, this);
+ }
+}