You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/28 14:48:09 UTC
[8/8] ignite git commit: ignite-6149
ignite-6149
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de99efa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de99efa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de99efa
Branch: refs/heads/ignite-6149
Commit: 6de99efa547f0fcd80946c2a3b9825f4b944b439
Parents: 57b9e8d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 28 16:07:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 28 17:45:12 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/MvccTestApp.java | 369 +++++++++++++++----
1 file changed, 307 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6de99efa/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
index b99e805..fe36e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
@@ -37,8 +37,11 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -53,17 +56,64 @@ public class MvccTestApp {
/** */
private static final boolean DEBUG_LOG = false;
+ public static void main0(String[] args) throws Exception {
+ final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3);
+
+ final int ACCOUNTS = 3;
+
+ final int START_VAL = 100_000;
+
+ final Map<Object, Object> data = new TreeMap<>();
+
+ for (int i = 0; i < ACCOUNTS; i++)
+ data.put(i, START_VAL);
+
+ cluster.txPutAll(data);
+
+ cluster.txTransfer(0, 1, true);
+ cluster.txTransfer(0, 1, true);
+ cluster.txTransfer(0, 2, true);
+
+ Map<Object, Object> getData = cluster.getAll(data.keySet());
+
+ int sum = 0;
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Integer val = (Integer)getData.get(i);
+
+ sum += val;
+
+ System.out.println("Val: " + val);
+ }
+
+ System.out.println("Sum: " + sum);
+
+ cluster.cleanup();
+
+ getData = cluster.getAll(data.keySet());
+
+ MvccQueryVersion ver1 = cluster.crd.queryVersion();
+ MvccQueryVersion ver2 = cluster.crd.queryVersion();
+
+ cluster.crd.queryDone(ver2.cntr);
+ cluster.crd.queryDone(ver1.cntr);
+
+ }
+
public static void main(String[] args) throws Exception {
final AtomicBoolean err = new AtomicBoolean();
- for (int iter = 0; iter < 10; iter++) {
- System.out.println("Iteration: " + iter);
+ final int READ_THREADS = 4;
+ final int UPDATE_THREADS = 6;
+ final int ACCOUNTS = 100;
- final TestCluster cluster = new TestCluster(1);
+ final int START_VAL = 1000;
- final int ACCOUNTS = 4;
+ for (int iter = 0; iter < 10; iter++) {
+ System.out.println("Iteration [readThreads=" + READ_THREADS +
+ ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS + ", iter=" + iter + ']');
- final int START_VAL = 10;
+ final TestCluster cluster = new TestCluster(1);
final Map<Object, Object> data = new TreeMap<>();
@@ -76,7 +126,28 @@ public class MvccTestApp {
List<Thread> threads = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
+ Thread cleanupThread = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("cleanup");
+
+ try {
+ while (!stop.get()) {
+ cluster.cleanup();
+
+ Thread.sleep(1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ threads.add(cleanupThread);
+
+ cleanupThread.start();
+
+ for (int i = 0; i < READ_THREADS; i++) {
final int id = i;
Thread thread = new Thread(new Runnable() {
@@ -127,7 +198,7 @@ public class MvccTestApp {
thread.start();
}
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < UPDATE_THREADS; i++) {
final int id = i;
Thread thread = new Thread(new Runnable() {
@@ -161,13 +232,15 @@ public class MvccTestApp {
thread.start();
}
- long endTime = System.currentTimeMillis() + 5000;
+ long endTime = System.currentTimeMillis() + 60_000;
while (!stop.get()) {
- Thread.sleep(100);
+ Thread.sleep(1000);
if (System.currentTimeMillis() >= endTime)
break;
+
+ //cluster.dumpMvccInfo();
}
stop.set(true);
@@ -195,6 +268,14 @@ public class MvccTestApp {
System.exit(1);
}
+// cluster.dumpMvccInfo();
+//
+// System.out.println("Cleanup");
+//
+// cluster.cleanup();
+//
+// cluster.dumpMvccInfo();
+
TestDebugLog.clear();
}
}
@@ -219,6 +300,13 @@ public class MvccTestApp {
nodes.add(new Node(i));
}
+ void cleanup() {
+ CoordinatorCounter cntr = crd.cleanupVersion();
+
+ for (Node node : nodes)
+ node.dataStore.cleanup(cntr);
+ }
+
void txPutAll(Map<Object, Object> data) {
TxId txId = new TxId(txIdGen.incrementAndGet());
@@ -345,6 +433,26 @@ public class MvccTestApp {
crd.txDone(txId);
}
+ public void dumpMvccInfo() {
+ for (Node node : nodes) {
+ for (Map.Entry<Object, MvccValue> e : node.dataStore.mainIdx.entrySet()) {
+ List<MvccValue> list = node.dataStore.mvccIdx.get(e.getKey());
+
+ int size = 0;
+
+ if (list != null) {
+ synchronized (list) {
+ size = list.size();
+ }
+ }
+
+ System.out.println("Mvcc info [key=" + e.getKey() +
+ ", val=" + e.getValue() +
+ ", mvccVals=" + size + ']');
+ }
+ }
+ }
+
public Map<Object, Object> getAll(Set<?> keys) {
MvccQueryVersion ver = crd.queryVersion();
@@ -407,6 +515,9 @@ public class MvccTestApp {
private final GridAtomicLong commitCntr = new GridAtomicLong(-1);
/** */
+ private final ConcurrentHashMap8<Long, QueryCounter> activeQueries = new ConcurrentHashMap8<>();
+
+ /** */
@GridToStringInclude
private final ConcurrentHashMap8<TxId, TxId> activeTxs = new ConcurrentHashMap8<>();
@@ -428,15 +539,12 @@ public class MvccTestApp {
commitCntr.setIfGreater(cntr.cntr);
}
- MvccQueryVersion queryVersion() {
- long useCntr = commitCntr.get();
-
- Set<TxId> txs = new HashSet<>();
-
+ private Long minActive(Set<TxId> txs) {
Long minActive = null;
for (Map.Entry<TxId, TxId> e : activeTxs.entrySet()) {
- txs.add(e.getKey());
+ if (txs != null)
+ txs.add(e.getKey());
TxId val = e.getValue();
@@ -451,14 +559,102 @@ public class MvccTestApp {
minActive = cntr;
}
+ return minActive;
+ }
+
+ static class QueryCounter extends AtomicInteger {
+ public QueryCounter(int initialValue) {
+ super(initialValue);
+ }
+
+ boolean increment2() {
+ for (;;) {
+ int current = get();
+ int next = current + 1;
+
+ if (current == 0)
+ return false;
+
+ if (compareAndSet(current, next))
+ return true;
+ }
+ }
+ }
+
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ MvccQueryVersion queryVersion() {
+ rwLock.readLock().lock();
+
+ long useCntr = commitCntr.get();
+
+ Set<TxId> txs = new HashSet<>();
+
+ Long minActive = minActive(txs);
+
if (minActive != null && minActive < useCntr)
useCntr = minActive;
- return new MvccQueryVersion(new CoordinatorCounter(useCntr), txs);
+ MvccQueryVersion qryVer = new MvccQueryVersion(new CoordinatorCounter(useCntr), txs);
+
+ for (;;) {
+ QueryCounter qryCnt = activeQueries.get(useCntr);
+
+ if (qryCnt != null) {
+ boolean inc = qryCnt.increment2();
+
+ if (!inc) {
+ activeQueries.remove(useCntr, qryCnt);
+
+ continue;
+ }
+ }
+ else {
+ qryCnt = new QueryCounter(1);
+
+ if (activeQueries.putIfAbsent(useCntr, qryCnt) != null)
+ continue;
+ }
+
+ break;
+ }
+
+ rwLock.readLock().unlock();
+
+ return qryVer;
}
- void queryDone(CoordinatorCounter ctr) {
+ void queryDone(CoordinatorCounter cntr) {
+ AtomicInteger qryCnt = activeQueries.get(cntr.cntr);
+
+ assert qryCnt != null : cntr.cntr;
+
+ int left = qryCnt.decrementAndGet();
+
+ assert left >= 0 : left;
+
+ if (left == 0)
+ activeQueries.remove(cntr.cntr, qryCnt);
+ }
+
+ CoordinatorCounter cleanupVersion() {
+ rwLock.writeLock().lock();
+
+ long useCntr = commitCntr.get();
+
+ Long minActive = minActive(null);
+
+ if (minActive != null && minActive < useCntr)
+ useCntr = minActive - 1;
+
+ for (Long qryCntr : activeQueries.keySet()) {
+ if (qryCntr <= useCntr)
+ useCntr = qryCntr - 1;
+ }
+
+ rwLock.writeLock().unlock();
+ return new CoordinatorCounter(useCntr);
}
@Override public String toString() {
@@ -595,10 +791,46 @@ public class MvccTestApp {
private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>();
/** */
- private final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>();
/** */
- private final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>();
+
+ void cleanup(CoordinatorCounter cleanupCntr) {
+ for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) {
+ lockEntry(e.getKey());
+
+ try {
+ List<MvccValue> list = e.getValue();
+
+ synchronized (list) {
+ for (int i = list.size() - 1; i >= 0; i--) {
+ MvccValue val = list.get(i);
+
+ if (val.ver.cntr.compareTo(cleanupCntr) <= 0) {
+ if (DEBUG_LOG) {
+ TestDebugLog.msgs.add(new TestDebugLog.Msg6_1("cleanup",
+ e.getKey(), val.val, val.ver, cleanupCntr.cntr, null, null));
+ }
+
+ mainIdx.put(e.getKey(), val);
+
+ for (int j = 0; j <= i; j++)
+ list.remove(0);
+
+ if (list.isEmpty())
+ mvccIdx.remove(e.getKey());
+
+ break;
+ }
+ }
+ }
+ }
+ finally {
+ unlockEntry(e.getKey());
+ }
+ }
+ }
void lockEntry(Object key) {
ReentrantLock e = lock(key);
@@ -615,8 +847,11 @@ public class MvccTestApp {
void updateEntry(Object key, Object val, MvccUpdateVersion ver) {
List<MvccValue> list = mvccIdx.get(key);
- if (list == null)
- mvccIdx.put(key, list = new ArrayList<>());
+ if (list == null) {
+ Object old = mvccIdx.putIfAbsent(key, list = new ArrayList<>());
+
+ assert old == null;
+ }
synchronized (list) {
list.add(new MvccValue(val, ver));
@@ -626,8 +861,12 @@ public class MvccTestApp {
Object lastValue(Object key) {
List<MvccValue> list = mvccIdx.get(key);
- if (list != null)
- return list.get(list.size() - 1).val;
+ if (list != null) {
+ synchronized (list) {
+ if (list.size() > 0)
+ return list.get(list.size() - 1).val;
+ }
+ }
MvccValue val = mainIdx.get(key);
@@ -651,7 +890,7 @@ public class MvccTestApp {
continue;
if (DEBUG_LOG) {
- TestDebugLog.msgs.add(new TestDebugLog.Msg3("read val", key, val, val.ver));
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read mvcc val", key, val, val.ver));
//log("get res [key=" + key + ", val=" + val.val + ", ver=" + val.ver + ']');
}
@@ -662,6 +901,19 @@ public class MvccTestApp {
MvccValue val = mainIdx.get(key);
+ if (val != null) {
+ int cmp = val.ver.cntr.compareTo(ver.cntr);
+
+ assert cmp <= 0 : "Committed [ver=" + val.ver + ", qryVer=" + ver.cntr + ']';
+
+ if (DEBUG_LOG)
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted val", key, val, val.ver));
+ }
+ else {
+ if (DEBUG_LOG)
+ TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted null", key, null, null));
+ }
+
return val != null ? val.val : null;
}
@@ -798,7 +1050,34 @@ class TestDebugLog {
", id2=" + v4 +
", v2=" + v5 +
", cntr=" + v6 +
- ", msg=" + msg +
+ ", thread=" + thread +
+ ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+ static class Msg6_1 extends Message{
+ Object v1;
+ Object v2;
+ Object v3;
+ Object v4;
+ Object v5;
+ Object v6;
+
+ public Msg6_1(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) {
+ super(msg);
+ this.v1 = v1;
+ this.v2 = v2;
+ this.v3 = v3;
+ this.v4 = v4;
+ this.v5 = v5;
+ this.v6 = v6;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg +
+ ", key=" + v1 +
+ ", val=" + v2 +
+ ", ver=" + v3 +
+ ", cleanupC=" + v4 +
", thread=" + thread +
", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
}
@@ -845,7 +1124,7 @@ class TestDebugLog {
System.out.println(msg);
}
- public static void addEntryMessage(Object key, Object val, Object arg, String msg) {
+ public static void addEntryMessage(Object key, Object val, String msg) {
if (key instanceof KeyCacheObject)
key = ((KeyCacheObject)key).value(null, false);
@@ -906,10 +1185,8 @@ class TestDebugLog {
if (msg instanceof Message) {
String msg0 = ((Message) msg).msg;
- if (msg0.equals("tx done") || msg0.equals("update"))
-
- w.println(msg.toString());
-
+ if (msg0.equals("tx done") || msg0.equals("update") || msg0.equals("cleanup"))
+ w.println(msg.toString());
}
}
@@ -1014,37 +1291,5 @@ class TestDebugLog {
it.remove();
}
}
- public static void main0(String[] args) throws Exception {
- final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3);
-
- final int ACCOUNTS = 3;
-
- final int START_VAL = 100_000;
-
- final Map<Object, Object> data = new TreeMap<>();
-
- for (int i = 0; i < ACCOUNTS; i++)
- data.put(i, START_VAL);
-
- cluster.txPutAll(data);
-
- cluster.txTransfer(0, 1, true);
- cluster.txTransfer(0, 1, true);
- cluster.txTransfer(0, 2, true);
-
- Map<Object, Object> getData = cluster.getAll(data.keySet());
-
- int sum = 0;
-
- for (int i = 0; i < ACCOUNTS; i++) {
- Integer val = (Integer)getData.get(i);
-
- sum += val;
-
- System.out.println("Val: " + val);
- }
-
- System.out.println("Sum: " + sum);
- }
}
\ No newline at end of file