You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/28 14:48:09 UTC

[8/8] ignite git commit: ignite-6149

ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de99efa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de99efa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de99efa

Branch: refs/heads/ignite-6149
Commit: 6de99efa547f0fcd80946c2a3b9825f4b944b439
Parents: 57b9e8d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 28 16:07:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 28 17:45:12 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/MvccTestApp.java | 369 +++++++++++++++----
 1 file changed, 307 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6de99efa/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
index b99e805..fe36e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
@@ -37,8 +37,11 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -53,17 +56,64 @@ public class MvccTestApp {
     /** */
     private static final boolean DEBUG_LOG = false;
 
+    public static void main0(String[] args) throws Exception {
+        final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3);
+
+        final int ACCOUNTS = 3;
+
+        final int START_VAL = 100_000;
+
+        final Map<Object, Object> data = new TreeMap<>();
+
+        for (int i = 0; i < ACCOUNTS; i++)
+            data.put(i, START_VAL);
+
+        cluster.txPutAll(data);
+
+        cluster.txTransfer(0, 1, true);
+        cluster.txTransfer(0, 1, true);
+        cluster.txTransfer(0, 2, true);
+
+        Map<Object, Object> getData = cluster.getAll(data.keySet());
+
+        int sum = 0;
+
+        for (int i = 0; i < ACCOUNTS; i++) {
+            Integer val = (Integer)getData.get(i);
+
+            sum += val;
+
+            System.out.println("Val: " + val);
+        }
+
+        System.out.println("Sum: " + sum);
+
+        cluster.cleanup();
+
+        getData = cluster.getAll(data.keySet());
+
+        MvccQueryVersion ver1 = cluster.crd.queryVersion();
+        MvccQueryVersion ver2 = cluster.crd.queryVersion();
+
+        cluster.crd.queryDone(ver2.cntr);
+        cluster.crd.queryDone(ver1.cntr);
+
+    }
+
     public static void main(String[] args) throws Exception {
         final AtomicBoolean err = new AtomicBoolean();
 
-        for (int iter = 0; iter < 10; iter++) {
-            System.out.println("Iteration: " + iter);
+        final int READ_THREADS = 4;
+        final int UPDATE_THREADS = 6;
+        final int ACCOUNTS = 100;
 
-            final TestCluster cluster = new TestCluster(1);
+        final int START_VAL = 1000;
 
-            final int ACCOUNTS = 4;
+        for (int iter = 0; iter < 10; iter++) {
+            System.out.println("Iteration [readThreads=" + READ_THREADS +
+                ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS + ", iter=" + iter + ']');
 
-            final int START_VAL = 10;
+            final TestCluster cluster = new TestCluster(1);
 
             final Map<Object, Object> data = new TreeMap<>();
 
@@ -76,7 +126,28 @@ public class MvccTestApp {
 
             List<Thread> threads = new ArrayList<>();
 
-            for (int i = 0; i < 5; i++) {
+            Thread cleanupThread = new Thread(new Runnable() {
+                @Override public void run() {
+                    Thread.currentThread().setName("cleanup");
+
+                    try {
+                        while (!stop.get()) {
+                            cluster.cleanup();
+
+                            Thread.sleep(1);
+                        }
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+
+            threads.add(cleanupThread);
+
+            cleanupThread.start();
+
+            for (int i = 0; i < READ_THREADS; i++) {
                 final int id = i;
 
                 Thread thread = new Thread(new Runnable() {
@@ -127,7 +198,7 @@ public class MvccTestApp {
                 thread.start();
             }
 
-            for (int i = 0; i < 2; i++) {
+            for (int i = 0; i < UPDATE_THREADS; i++) {
                 final int id = i;
 
                 Thread thread = new Thread(new Runnable() {
@@ -161,13 +232,15 @@ public class MvccTestApp {
                 thread.start();
             }
 
-            long endTime = System.currentTimeMillis() + 5000;
+            long endTime = System.currentTimeMillis() + 60_000;
 
             while (!stop.get()) {
-                Thread.sleep(100);
+                Thread.sleep(1000);
 
                 if (System.currentTimeMillis() >= endTime)
                     break;
+
+                //cluster.dumpMvccInfo();
             }
 
             stop.set(true);
@@ -195,6 +268,14 @@ public class MvccTestApp {
                 System.exit(1);
             }
 
+//            cluster.dumpMvccInfo();
+//
+//            System.out.println("Cleanup");
+//
+//            cluster.cleanup();
+//
+//            cluster.dumpMvccInfo();
+
             TestDebugLog.clear();
         }
     }
@@ -219,6 +300,13 @@ public class MvccTestApp {
                 nodes.add(new Node(i));
         }
 
+        void cleanup() {
+            CoordinatorCounter cntr = crd.cleanupVersion();
+
+            for (Node node : nodes)
+                node.dataStore.cleanup(cntr);
+        }
+
         void txPutAll(Map<Object, Object> data) {
             TxId txId = new TxId(txIdGen.incrementAndGet());
 
@@ -345,6 +433,26 @@ public class MvccTestApp {
             crd.txDone(txId);
         }
 
+        public void dumpMvccInfo() {
+            for (Node node : nodes) {
+                for (Map.Entry<Object, MvccValue> e : node.dataStore.mainIdx.entrySet()) {
+                    List<MvccValue> list = node.dataStore.mvccIdx.get(e.getKey());
+
+                    int size = 0;
+
+                    if (list != null) {
+                        synchronized (list) {
+                            size = list.size();
+                        }
+                    }
+
+                    System.out.println("Mvcc info [key=" + e.getKey() +
+                        ", val=" + e.getValue() +
+                        ", mvccVals=" + size + ']');
+                }
+            }
+        }
+
         public Map<Object, Object> getAll(Set<?> keys) {
             MvccQueryVersion ver = crd.queryVersion();
 
@@ -407,6 +515,9 @@ public class MvccTestApp {
         private final GridAtomicLong commitCntr = new GridAtomicLong(-1);
 
         /** */
+        private final ConcurrentHashMap8<Long, QueryCounter> activeQueries = new ConcurrentHashMap8<>();
+
+        /** */
         @GridToStringInclude
         private final ConcurrentHashMap8<TxId, TxId> activeTxs = new ConcurrentHashMap8<>();
 
@@ -428,15 +539,12 @@ public class MvccTestApp {
             commitCntr.setIfGreater(cntr.cntr);
         }
 
-        MvccQueryVersion queryVersion() {
-            long useCntr = commitCntr.get();
-
-            Set<TxId> txs = new HashSet<>();
-
+        private Long minActive(Set<TxId> txs) {
             Long minActive = null;
 
             for (Map.Entry<TxId, TxId> e : activeTxs.entrySet()) {
-                txs.add(e.getKey());
+                if (txs != null)
+                    txs.add(e.getKey());
 
                 TxId val = e.getValue();
 
@@ -451,14 +559,102 @@ public class MvccTestApp {
                     minActive = cntr;
             }
 
+            return minActive;
+        }
+
+        static class QueryCounter extends AtomicInteger {
+            public QueryCounter(int initialValue) {
+                super(initialValue);
+            }
+
+            boolean increment2() {
+                for (;;) {
+                    int current = get();
+                    int next = current + 1;
+
+                    if (current == 0)
+                        return false;
+
+                    if (compareAndSet(current, next))
+                        return true;
+                }
+            }
+        }
+
+        private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+        MvccQueryVersion queryVersion() {
+            rwLock.readLock().lock();
+
+            long useCntr = commitCntr.get();
+
+            Set<TxId> txs = new HashSet<>();
+
+            Long minActive = minActive(txs);
+
             if (minActive != null && minActive < useCntr)
                 useCntr = minActive;
 
-            return new MvccQueryVersion(new CoordinatorCounter(useCntr), txs);
+            MvccQueryVersion qryVer = new MvccQueryVersion(new CoordinatorCounter(useCntr), txs);
+
+            for (;;) {
+                QueryCounter qryCnt = activeQueries.get(useCntr);
+
+                if (qryCnt != null) {
+                    boolean inc = qryCnt.increment2();
+
+                    if (!inc) {
+                        activeQueries.remove(useCntr, qryCnt);
+
+                        continue;
+                    }
+                }
+                else {
+                    qryCnt = new QueryCounter(1);
+
+                    if (activeQueries.putIfAbsent(useCntr, qryCnt) != null)
+                        continue;
+                }
+
+                break;
+            }
+
+            rwLock.readLock().unlock();
+
+            return qryVer;
         }
 
-        void queryDone(CoordinatorCounter ctr) {
+        void queryDone(CoordinatorCounter cntr) {
+            AtomicInteger qryCnt = activeQueries.get(cntr.cntr);
+
+            assert qryCnt != null : cntr.cntr;
+
+            int left = qryCnt.decrementAndGet();
+
+            assert left >= 0 : left;
+
+            if (left == 0)
+                activeQueries.remove(cntr.cntr, qryCnt);
+        }
+
+        CoordinatorCounter cleanupVersion() {
+            rwLock.writeLock().lock();
+
+            long useCntr = commitCntr.get();
+
+            Long minActive = minActive(null);
+
+            if (minActive != null && minActive < useCntr)
+                useCntr = minActive - 1;
+
+            for (Long qryCntr : activeQueries.keySet()) {
+                if (qryCntr <= useCntr)
+                    useCntr = qryCntr - 1;
+            }
+
+            rwLock.writeLock().unlock();
 
+            return new CoordinatorCounter(useCntr);
         }
 
         @Override public String toString() {
@@ -595,10 +791,46 @@ public class MvccTestApp {
         private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>();
 
         /** */
-        private final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>();
+        final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>();
 
         /** */
-        private final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>();
+        final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>();
+
+        void cleanup(CoordinatorCounter cleanupCntr) {
+            for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) {
+                lockEntry(e.getKey());
+
+                try {
+                    List<MvccValue> list = e.getValue();
+
+                    synchronized (list) {
+                        for (int i = list.size() - 1; i >= 0; i--) {
+                            MvccValue val = list.get(i);
+
+                            if (val.ver.cntr.compareTo(cleanupCntr) <= 0) {
+                                if (DEBUG_LOG) {
+                                    TestDebugLog.msgs.add(new TestDebugLog.Msg6_1("cleanup",
+                                        e.getKey(), val.val, val.ver, cleanupCntr.cntr, null, null));
+                                }
+
+                                mainIdx.put(e.getKey(), val);
+
+                                for (int j = 0; j <= i; j++)
+                                    list.remove(0);
+
+                                if (list.isEmpty())
+                                    mvccIdx.remove(e.getKey());
+
+                                break;
+                            }
+                        }
+                    }
+                }
+                finally {
+                    unlockEntry(e.getKey());
+                }
+            }
+        }
 
         void lockEntry(Object key) {
             ReentrantLock e = lock(key);
@@ -615,8 +847,11 @@ public class MvccTestApp {
         void updateEntry(Object key, Object val, MvccUpdateVersion ver) {
             List<MvccValue> list = mvccIdx.get(key);
 
-            if (list == null)
-                mvccIdx.put(key, list = new ArrayList<>());
+            if (list == null) {
+                Object old = mvccIdx.putIfAbsent(key, list = new ArrayList<>());
+
+                assert old == null;
+            }
 
             synchronized (list) {
                 list.add(new MvccValue(val, ver));
@@ -626,8 +861,12 @@ public class MvccTestApp {
         Object lastValue(Object key) {
             List<MvccValue> list = mvccIdx.get(key);
 
-            if (list != null)
-                return list.get(list.size() - 1).val;
+            if (list != null) {
+                synchronized (list) {
+                    if (list.size() > 0)
+                        return list.get(list.size() - 1).val;
+                }
+            }
 
             MvccValue val = mainIdx.get(key);
 
@@ -651,7 +890,7 @@ public class MvccTestApp {
                             continue;
 
                         if (DEBUG_LOG) {
-                            TestDebugLog.msgs.add(new TestDebugLog.Msg3("read val", key, val, val.ver));
+                            TestDebugLog.msgs.add(new TestDebugLog.Msg3("read mvcc val", key, val, val.ver));
                             //log("get res [key=" + key + ", val=" + val.val + ", ver=" + val.ver + ']');
                         }
 
@@ -662,6 +901,19 @@ public class MvccTestApp {
 
             MvccValue val = mainIdx.get(key);
 
+            if (val != null) {
+                int cmp = val.ver.cntr.compareTo(ver.cntr);
+
+                assert cmp <= 0 : "Committed [ver=" + val.ver + ", qryVer=" + ver.cntr + ']';
+
+                if (DEBUG_LOG)
+                    TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted val", key, val, val.ver));
+            }
+            else {
+                if (DEBUG_LOG)
+                    TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted null", key, null, null));
+            }
+
             return val != null ? val.val : null;
         }
 
@@ -798,7 +1050,34 @@ class TestDebugLog {
                 ", id2=" + v4 +
                 ", v2=" + v5 +
                 ", cntr=" + v6 +
-                ", msg=" + msg +
+                ", thread=" + thread +
+                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+    static class Msg6_1 extends Message{
+        Object v1;
+        Object v2;
+        Object v3;
+        Object v4;
+        Object v5;
+        Object v6;
+
+        public Msg6_1(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) {
+            super(msg);
+            this.v1 = v1;
+            this.v2 = v2;
+            this.v3 = v3;
+            this.v4 = v4;
+            this.v5 = v5;
+            this.v6 = v6;
+        }
+
+        public String toString() {
+            return "Msg [msg=" + msg +
+                ", key=" + v1 +
+                ", val=" + v2 +
+                ", ver=" + v3 +
+                ", cleanupC=" + v4 +
                 ", thread=" + thread +
                 ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
         }
@@ -845,7 +1124,7 @@ class TestDebugLog {
             System.out.println(msg);
     }
 
-    public static void addEntryMessage(Object key, Object val, Object arg, String msg) {
+    public static void addEntryMessage(Object key, Object val, String msg) {
         if (key instanceof KeyCacheObject)
             key = ((KeyCacheObject)key).value(null, false);
 
@@ -906,10 +1185,8 @@ class TestDebugLog {
                 if (msg instanceof Message) {
                     String msg0 = ((Message) msg).msg;
 
-                    if (msg0.equals("tx done") || msg0.equals("update"))
-
-                    w.println(msg.toString());
-
+                    if (msg0.equals("tx done") || msg0.equals("update") || msg0.equals("cleanup"))
+                        w.println(msg.toString());
                 }
             }
 
@@ -1014,37 +1291,5 @@ class TestDebugLog {
                 it.remove();
         }
     }
-    public static void main0(String[] args) throws Exception {
-        final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3);
-
-        final int ACCOUNTS = 3;
-
-        final int START_VAL = 100_000;
-
-        final Map<Object, Object> data = new TreeMap<>();
-
-        for (int i = 0; i < ACCOUNTS; i++)
-            data.put(i, START_VAL);
-
-        cluster.txPutAll(data);
-
-        cluster.txTransfer(0, 1, true);
-        cluster.txTransfer(0, 1, true);
-        cluster.txTransfer(0, 2, true);
-
-        Map<Object, Object> getData = cluster.getAll(data.keySet());
-
-        int sum = 0;
-
-        for (int i = 0; i < ACCOUNTS; i++) {
-            Integer val = (Integer)getData.get(i);
-
-            sum += val;
-
-            System.out.println("Val: " + val);
-        }
-
-        System.out.println("Sum: " + sum);
-    }
 
 }
\ No newline at end of file