You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2019/12/09 09:57:23 UTC

[ignite] branch master updated: IGNITE-11857 PartitionTxUpdateCounter optimization - Fixes #6686.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 15993fa  IGNITE-11857 PartitionTxUpdateCounter optimization  - Fixes #6686.
15993fa is described below

commit 15993fa358c2dc90f6b9ad8711ad78e04b539cd3
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Dec 9 12:55:43 2019 +0300

    IGNITE-11857 PartitionTxUpdateCounter optimization  - Fixes #6686.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../misc/JmhPartitionUpdateCounterBenchmark.java   | 112 +++++++++++++++
 .../cache/PartitionTxUpdateCounterImpl.java        | 153 +++++++--------------
 .../distributed/dht/GridDhtTxPrepareFuture.java    |  14 +-
 .../distributed/near/GridNearTxPrepareRequest.java |   3 +-
 4 files changed, 174 insertions(+), 108 deletions(-)

diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
new file mode 100644
index 0000000..9aa0102
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.misc;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmarks {@link PartitionTxUpdateCounterImpl} class.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Warmup(iterations = 5, time = 2)
+@Measurement(iterations = 10, time = 3)
+public class JmhPartitionUpdateCounterBenchmark {
+    /** Buffer size to store testing gaps. */
+    private static final int GAPS_BUFFER_SIZE = 50;
+
+    /** Max delta for next counter value. */
+    private static final int COUNTER_MAX_DELTA= 50;
+
+    /** Testing gaps buffer. */
+    private final long [][] gapsBuf = new long[GAPS_BUFFER_SIZE][];
+
+    /** Random numbers generator. */
+    private Random rnd;
+
+    /** Counter. */
+    private final AtomicLong reservedCntr = new AtomicLong();
+
+    /** Partition update counter. */
+    private final PartitionUpdateCounter partCntr = new PartitionTxUpdateCounterImpl();
+
+    /**
+     * Setup.
+     */
+    @Setup(Level.Iteration)
+    public void setup() {
+        rnd = new Random(0);
+
+        reservedCntr.set(0);
+
+        for (int i = 0; i < GAPS_BUFFER_SIZE; i++) {
+            long cntrDelta = rnd.nextInt(COUNTER_MAX_DELTA);
+
+            gapsBuf[i] = new long[] {reservedCntr.getAndAdd(cntrDelta), cntrDelta};
+        }
+
+        partCntr.reset();
+    }
+
+    /**
+     * Update partition update counter with random gap.
+     */
+    @Benchmark
+    public void updateWithGap() {
+        int nextIdx = rnd.nextInt(GAPS_BUFFER_SIZE);
+
+        partCntr.update(gapsBuf[nextIdx][0], gapsBuf[nextIdx][1]);
+
+        gapsBuf[nextIdx][0] = reservedCntr.getAndAdd(gapsBuf[nextIdx][1]);
+    }
+
+    /**
+     *
+     * @param args Args.
+     * @throws Exception Exception.
+     */
+    public static void main(String[] args) throws Exception {
+        final Options options = new OptionsBuilder()
+            .include(JmhPartitionUpdateCounterBenchmark.class.getSimpleName())
+            .build();
+
+        new Runner(options).run();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
index 14eb004..03293c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
@@ -23,8 +23,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -66,7 +66,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
     private static final byte VERSION = 1;
 
     /** Queue of applied out of order counter updates. */
-    private TreeSet<Item> queue = new TreeSet<>();
+    private NavigableMap<Long, Item> queue = new TreeMap<>();
 
     /** LWM. */
     private final AtomicLong cntr = new AtomicLong();
@@ -104,7 +104,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
     /** */
     protected synchronized long highestAppliedCounter() {
-        return queue.isEmpty() ? cntr.get() : queue.last().absolute();
+        return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
     }
 
     /**
@@ -145,8 +145,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
          * during rebalance. All gaps are safe to "forget".
          * Should only do it for first PME (later missed updates on node left are reset in {@link #finalizeUpdateCounters}. */
         if (first) {
-            if (!queue.isEmpty())
-                queue.clear();
+            queue.clear();
 
             first = false;
         }
@@ -154,73 +153,54 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
     /** {@inheritDoc} */
     @Override public synchronized boolean update(long start, long delta) {
-        long cur = cntr.get(), next;
+        long cur = cntr.get();
 
         if (cur > start)
             return false;
-
-        if (cur < start) {
+        else if (cur < start) {
             // Try merge with adjacent gaps in sequence.
-            Item tmp = new Item(start, delta);
-            Item ref = tmp;
-
-            NavigableSet<Item> set = queue.headSet(tmp, false);
+            long next = start + delta;
 
-            // Merge with previous, possibly modifying previous.
-            if (!set.isEmpty()) {
-                Item last = set.last();
+            // Merge with next.
+            Item nextItem = queue.remove(next);
 
-                if (last.start + last.delta == start) {
-                    tmp = last;
-
-                    last.delta += delta;
-                }
-                else if (last.within(start) && last.within(start + delta - 1))
-                    return false;
-            }
+            if (nextItem != null)
+                delta += nextItem.delta;
 
-            // Merge with next, possibly modifying previous and removing next.
-            if (!(set = queue.tailSet(tmp, false)).isEmpty()) {
-                Item first = set.first();
+            // Merge with previous, possibly modifying previous.
+            Map.Entry<Long, Item> prev = queue.lowerEntry(start);
 
-                if (tmp.start + tmp.delta == first.start) {
-                    if (ref != tmp) {
-                        tmp.delta += first.delta;
+            if (prev != null) {
+                Item prevItem = prev.getValue();
 
-                        set.pollFirst(); // Merge and remove obsolete head.
-                    }
-                    else {
-                        tmp = first;
+                if (prevItem.absolute() == start) {
+                    prevItem.delta += delta;
 
-                        first.start = start;
-                        first.delta += delta;
-                    }
+                    return true;
                 }
+                else if (prevItem.within(next - 1))
+                    return false;
             }
 
-            if (tmp != ref)
-                return true;
+            if (queue.size() >= MAX_MISSED_UPDATES) // Should trigger failure handler.
+                throw new IgniteException("Too many gaps [cntr=" + this + ']');
 
-            return offer(new Item(start, delta)); // backup node with gaps
+            return queue.putIfAbsent(start, new Item(start, delta)) == null;
         }
+        else { // cur == start
+            long next = start + delta;
 
-        while (true) {
-            boolean res = cntr.compareAndSet(cur, next = start + delta);
+            // There is only one next sequential item possible, all other items will be merged.
+            Item nextItem = queue.remove(next);
 
-            assert res;
-
-            Item peek = peek();
+            if (nextItem != null)
+                next += nextItem.delta;
 
-            if (peek == null || peek.start != next)
-                return true;
+            boolean res = cntr.compareAndSet(cur, next);
 
-            Item item = poll();
-
-            assert peek == item;
+            assert res;
 
-            start = item.start;
-            delta = item.delta;
-            cur = next;
+            return true;
         }
     }
 
@@ -234,29 +214,9 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
             reserveCntr.set(initCntr);
     }
 
-    /** */
-    private Item poll() {
-        return queue.pollFirst();
-    }
-
-    /** */
-    private Item peek() {
-        return queue.isEmpty() ? null : queue.first();
-    }
-
-    /**
-     * @param item Item.
-     */
-    private boolean offer(Item item) {
-        if (queue.size() == MAX_MISSED_UPDATES) // Should trigger failure handler.
-            throw new IgniteException("Too many gaps [cntr=" + this + ']');
-
-        return queue.add(item);
-    }
-
     /** {@inheritDoc} */
     @Override public synchronized GridLongList finalizeUpdateCounters() {
-        Item item = poll();
+        Map.Entry<Long, Item> item = queue.pollFirstEntry();
 
         GridLongList gaps = null;
 
@@ -265,15 +225,15 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
                 gaps = new GridLongList((queue.size() + 1) * 2);
 
             long start = cntr.get() + 1;
-            long end = item.start;
+            long end = item.getValue().start;
 
             gaps.add(start);
             gaps.add(end);
 
             // Close pending ranges.
-            cntr.set(item.start + item.delta);
+            cntr.set(item.getValue().absolute());
 
-            item = poll();
+            item = queue.pollFirstEntry();
         }
 
         reserveCntr.set(get());
@@ -299,7 +259,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
     /** {@inheritDoc} */
     @Override public synchronized boolean sequential() {
-        return gaps().isEmpty();
+        return queue.isEmpty();
     }
 
     /** {@inheritDoc} */
@@ -318,7 +278,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
             dos.writeInt(size);
 
-            for (Item item : queue) {
+            for (Item item : queue.values()) {
                 dos.writeLong(item.start);
                 dos.writeLong(item.delta);
             }
@@ -335,11 +295,11 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
     /**
      * @param raw Raw bytes.
      */
-    private @Nullable TreeSet<Item> fromBytes(@Nullable byte[] raw) {
-        if (raw == null)
-            return new TreeSet<>();
+    private @Nullable NavigableMap<Long, Item> fromBytes(@Nullable byte[] raw) {
+        NavigableMap<Long, Item> ret = new TreeMap<>();
 
-        TreeSet<Item> ret = new TreeSet<>();
+        if (raw == null)
+            return ret;
 
         try {
             ByteArrayInputStream bis = new ByteArrayInputStream(raw);
@@ -350,8 +310,11 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
             int cnt = dis.readInt(); // Holes count.
 
-            while(cnt-- > 0)
-                ret.add(new Item(dis.readLong(), dis.readLong()));
+            while (cnt-- > 0) {
+                Item item = new Item(dis.readLong(), dis.readLong());
+
+                ret.put(item.start, item);
+            }
 
             return ret;
         }
@@ -360,24 +323,19 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
         }
     }
 
-    /** */
-    private TreeSet<Item> gaps() {
-        return queue;
-    }
-
     /** {@inheritDoc} */
     @Override public synchronized void reset() {
         cntr.set(0);
 
         reserveCntr.set(0);
 
-        queue = new TreeSet<>();
+        queue.clear();
     }
 
     /**
      * Update counter task. Update from start value by delta value.
      */
-    private static class Item implements Comparable<Item> {
+    private static class Item {
         /** */
         private long start;
 
@@ -394,11 +352,6 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
         }
 
         /** {@inheritDoc} */
-        @Override public int compareTo(@NotNull Item o) {
-            return Long.compare(this.start, o.start);
-        }
-
-        /** {@inheritDoc} */
         @Override public String toString() {
             return "Item [" +
                 "start=" + start +
@@ -469,9 +422,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
     /** {@inheritDoc} */
     @Override public Iterator<long[]> iterator() {
-        return F.iterator(queue.iterator(), item -> {
-            return new long[] {item.start, item.delta};
-        }, true);
+        return F.iterator(queue.values().iterator(), item -> new long[] {item.start, item.delta}, true);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6a42c1b..78fbe9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -934,7 +934,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      */
     private void addDhtValues(GridNearTxPrepareResponse res) {
         // Interceptor on near node needs old values to execute callbacks.
-        if (!F.isEmpty(req.writes())) {
+        if (req.writes() != null) {
             for (IgniteTxEntry e : req.writes()) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
@@ -1057,14 +1057,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
         boolean validateCache = needCacheValidation(node);
 
+        boolean writesEmpty = isEmpty(req.writes());
+
         if (validateCache) {
             GridDhtTopologyFuture topFut = cctx.exchange().lastFinishedFuture();
 
-            if (topFut != null && !isEmpty(req.writes())) {
+            if (topFut != null && !writesEmpty) {
                 // All caches either read only or not. So validation of one cache context is enough.
                 GridCacheContext ctx = F.first(req.writes()).context();
 
-                Throwable err = topFut.validateCache(ctx, req.recovery(), isEmpty(req.writes()), null, null);
+                Throwable err = topFut.validateCache(ctx, req.recovery(), writesEmpty, null, null);
 
                 if (err != null)
                     onDone(null, new IgniteCheckedException(err));
@@ -1073,7 +1075,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
         boolean ser = tx.serializable() && tx.optimistic();
 
-        if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
+        if (!writesEmpty || (ser && !F.isEmpty(req.reads()))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
             for (IgniteTxEntry entry : req.writes())
@@ -1309,7 +1311,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             TxCounters counters = tx.txCounters(true);
 
             // Assign keys to primary nodes.
-            if (!F.isEmpty(req.writes())) {
+            if (req.writes() != null) {
                 for (IgniteTxEntry write : req.writes()) {
                     IgniteTxEntry entry = tx.entry(write.txKey());
 
@@ -1326,7 +1328,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 }
             }
 
-            if (!F.isEmpty(req.reads())) {
+            if (req.reads() != null) {
                 for (IgniteTxEntry read : req.reads())
                     map(tx.entry(read.txKey()));
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 941c7b3..c860bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -277,7 +278,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
      */
     private Collection<IgniteTxEntry> cloneEntries(Collection<IgniteTxEntry> c) {
         if (F.isEmpty(c))
-            return c;
+            return Collections.emptyList();
 
         Collection<IgniteTxEntry> cp = new ArrayList<>(c.size());