You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2019/12/09 09:57:23 UTC
[ignite] branch master updated: IGNITE-11857
PartitionTxUpdateCounter optimization - Fixes #6686.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 15993fa IGNITE-11857 PartitionTxUpdateCounter optimization - Fixes #6686.
15993fa is described below
commit 15993fa358c2dc90f6b9ad8711ad78e04b539cd3
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Dec 9 12:55:43 2019 +0300
IGNITE-11857 PartitionTxUpdateCounter optimization - Fixes #6686.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../misc/JmhPartitionUpdateCounterBenchmark.java | 112 +++++++++++++++
.../cache/PartitionTxUpdateCounterImpl.java | 153 +++++++--------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 14 +-
.../distributed/near/GridNearTxPrepareRequest.java | 3 +-
4 files changed, 174 insertions(+), 108 deletions(-)
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
new file mode 100644
index 0000000..9aa0102
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/JmhPartitionUpdateCounterBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.misc;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmarks {@link PartitionTxUpdateCounterImpl} class.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Warmup(iterations = 5, time = 2)
+@Measurement(iterations = 10, time = 3)
+public class JmhPartitionUpdateCounterBenchmark {
+ /** Buffer size to store testing gaps. */
+ private static final int GAPS_BUFFER_SIZE = 50;
+
+ /** Max delta for next counter value. */
+ private static final int COUNTER_MAX_DELTA= 50;
+
+ /** Testing gaps buffer. */
+ private final long [][] gapsBuf = new long[GAPS_BUFFER_SIZE][];
+
+ /** Random numbers generator. */
+ private Random rnd;
+
+ /** Counter. */
+ private final AtomicLong reservedCntr = new AtomicLong();
+
+ /** Partition update counter. */
+ private final PartitionUpdateCounter partCntr = new PartitionTxUpdateCounterImpl();
+
+ /**
+ * Setup.
+ */
+ @Setup(Level.Iteration)
+ public void setup() {
+ rnd = new Random(0);
+
+ reservedCntr.set(0);
+
+ for (int i = 0; i < GAPS_BUFFER_SIZE; i++) {
+ long cntrDelta = rnd.nextInt(COUNTER_MAX_DELTA);
+
+ gapsBuf[i] = new long[] {reservedCntr.getAndAdd(cntrDelta), cntrDelta};
+ }
+
+ partCntr.reset();
+ }
+
+ /**
+ * Update partition update counter with random gap.
+ */
+ @Benchmark
+ public void updateWithGap() {
+ int nextIdx = rnd.nextInt(GAPS_BUFFER_SIZE);
+
+ partCntr.update(gapsBuf[nextIdx][0], gapsBuf[nextIdx][1]);
+
+ gapsBuf[nextIdx][0] = reservedCntr.getAndAdd(gapsBuf[nextIdx][1]);
+ }
+
+ /**
+ *
+ * @param args Args.
+ * @throws Exception Exception.
+ */
+ public static void main(String[] args) throws Exception {
+ final Options options = new OptionsBuilder()
+ .include(JmhPartitionUpdateCounterBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(options).run();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
index 14eb004..03293c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
@@ -23,8 +23,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -66,7 +66,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
private static final byte VERSION = 1;
/** Queue of applied out of order counter updates. */
- private TreeSet<Item> queue = new TreeSet<>();
+ private NavigableMap<Long, Item> queue = new TreeMap<>();
/** LWM. */
private final AtomicLong cntr = new AtomicLong();
@@ -104,7 +104,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/** */
protected synchronized long highestAppliedCounter() {
- return queue.isEmpty() ? cntr.get() : queue.last().absolute();
+ return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
}
/**
@@ -145,8 +145,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
* during rebalance. All gaps are safe to "forget".
* Should only do it for first PME (later missed updates on node left are reset in {@link #finalizeUpdateCounters}. */
if (first) {
- if (!queue.isEmpty())
- queue.clear();
+ queue.clear();
first = false;
}
@@ -154,73 +153,54 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/** {@inheritDoc} */
@Override public synchronized boolean update(long start, long delta) {
- long cur = cntr.get(), next;
+ long cur = cntr.get();
if (cur > start)
return false;
-
- if (cur < start) {
+ else if (cur < start) {
// Try merge with adjacent gaps in sequence.
- Item tmp = new Item(start, delta);
- Item ref = tmp;
-
- NavigableSet<Item> set = queue.headSet(tmp, false);
+ long next = start + delta;
- // Merge with previous, possibly modifying previous.
- if (!set.isEmpty()) {
- Item last = set.last();
+ // Merge with next.
+ Item nextItem = queue.remove(next);
- if (last.start + last.delta == start) {
- tmp = last;
-
- last.delta += delta;
- }
- else if (last.within(start) && last.within(start + delta - 1))
- return false;
- }
+ if (nextItem != null)
+ delta += nextItem.delta;
- // Merge with next, possibly modifying previous and removing next.
- if (!(set = queue.tailSet(tmp, false)).isEmpty()) {
- Item first = set.first();
+ // Merge with previous, possibly modifying previous.
+ Map.Entry<Long, Item> prev = queue.lowerEntry(start);
- if (tmp.start + tmp.delta == first.start) {
- if (ref != tmp) {
- tmp.delta += first.delta;
+ if (prev != null) {
+ Item prevItem = prev.getValue();
- set.pollFirst(); // Merge and remove obsolete head.
- }
- else {
- tmp = first;
+ if (prevItem.absolute() == start) {
+ prevItem.delta += delta;
- first.start = start;
- first.delta += delta;
- }
+ return true;
}
+ else if (prevItem.within(next - 1))
+ return false;
}
- if (tmp != ref)
- return true;
+ if (queue.size() >= MAX_MISSED_UPDATES) // Should trigger failure handler.
+ throw new IgniteException("Too many gaps [cntr=" + this + ']');
- return offer(new Item(start, delta)); // backup node with gaps
+ return queue.putIfAbsent(start, new Item(start, delta)) == null;
}
+ else { // cur == start
+ long next = start + delta;
- while (true) {
- boolean res = cntr.compareAndSet(cur, next = start + delta);
+ // There is only one next sequential item possible, all other items will be merged.
+ Item nextItem = queue.remove(next);
- assert res;
-
- Item peek = peek();
+ if (nextItem != null)
+ next += nextItem.delta;
- if (peek == null || peek.start != next)
- return true;
+ boolean res = cntr.compareAndSet(cur, next);
- Item item = poll();
-
- assert peek == item;
+ assert res;
- start = item.start;
- delta = item.delta;
- cur = next;
+ return true;
}
}
@@ -234,29 +214,9 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
reserveCntr.set(initCntr);
}
- /** */
- private Item poll() {
- return queue.pollFirst();
- }
-
- /** */
- private Item peek() {
- return queue.isEmpty() ? null : queue.first();
- }
-
- /**
- * @param item Item.
- */
- private boolean offer(Item item) {
- if (queue.size() == MAX_MISSED_UPDATES) // Should trigger failure handler.
- throw new IgniteException("Too many gaps [cntr=" + this + ']');
-
- return queue.add(item);
- }
-
/** {@inheritDoc} */
@Override public synchronized GridLongList finalizeUpdateCounters() {
- Item item = poll();
+ Map.Entry<Long, Item> item = queue.pollFirstEntry();
GridLongList gaps = null;
@@ -265,15 +225,15 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
gaps = new GridLongList((queue.size() + 1) * 2);
long start = cntr.get() + 1;
- long end = item.start;
+ long end = item.getValue().start;
gaps.add(start);
gaps.add(end);
// Close pending ranges.
- cntr.set(item.start + item.delta);
+ cntr.set(item.getValue().absolute());
- item = poll();
+ item = queue.pollFirstEntry();
}
reserveCntr.set(get());
@@ -299,7 +259,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/** {@inheritDoc} */
@Override public synchronized boolean sequential() {
- return gaps().isEmpty();
+ return queue.isEmpty();
}
/** {@inheritDoc} */
@@ -318,7 +278,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
dos.writeInt(size);
- for (Item item : queue) {
+ for (Item item : queue.values()) {
dos.writeLong(item.start);
dos.writeLong(item.delta);
}
@@ -335,11 +295,11 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/**
* @param raw Raw bytes.
*/
- private @Nullable TreeSet<Item> fromBytes(@Nullable byte[] raw) {
- if (raw == null)
- return new TreeSet<>();
+ private @Nullable NavigableMap<Long, Item> fromBytes(@Nullable byte[] raw) {
+ NavigableMap<Long, Item> ret = new TreeMap<>();
- TreeSet<Item> ret = new TreeSet<>();
+ if (raw == null)
+ return ret;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(raw);
@@ -350,8 +310,11 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
int cnt = dis.readInt(); // Holes count.
- while(cnt-- > 0)
- ret.add(new Item(dis.readLong(), dis.readLong()));
+ while (cnt-- > 0) {
+ Item item = new Item(dis.readLong(), dis.readLong());
+
+ ret.put(item.start, item);
+ }
return ret;
}
@@ -360,24 +323,19 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
}
}
- /** */
- private TreeSet<Item> gaps() {
- return queue;
- }
-
/** {@inheritDoc} */
@Override public synchronized void reset() {
cntr.set(0);
reserveCntr.set(0);
- queue = new TreeSet<>();
+ queue.clear();
}
/**
* Update counter task. Update from start value by delta value.
*/
- private static class Item implements Comparable<Item> {
+ private static class Item {
/** */
private long start;
@@ -394,11 +352,6 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
}
/** {@inheritDoc} */
- @Override public int compareTo(@NotNull Item o) {
- return Long.compare(this.start, o.start);
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return "Item [" +
"start=" + start +
@@ -469,9 +422,7 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/** {@inheritDoc} */
@Override public Iterator<long[]> iterator() {
- return F.iterator(queue.iterator(), item -> {
- return new long[] {item.start, item.delta};
- }, true);
+ return F.iterator(queue.values().iterator(), item -> new long[] {item.start, item.delta}, true);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6a42c1b..78fbe9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -934,7 +934,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
*/
private void addDhtValues(GridNearTxPrepareResponse res) {
// Interceptor on near node needs old values to execute callbacks.
- if (!F.isEmpty(req.writes())) {
+ if (req.writes() != null) {
for (IgniteTxEntry e : req.writes()) {
IgniteTxEntry txEntry = tx.entry(e.txKey());
@@ -1057,14 +1057,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
boolean validateCache = needCacheValidation(node);
+ boolean writesEmpty = isEmpty(req.writes());
+
if (validateCache) {
GridDhtTopologyFuture topFut = cctx.exchange().lastFinishedFuture();
- if (topFut != null && !isEmpty(req.writes())) {
+ if (topFut != null && !writesEmpty) {
// All caches either read only or not. So validation of one cache context is enough.
GridCacheContext ctx = F.first(req.writes()).context();
- Throwable err = topFut.validateCache(ctx, req.recovery(), isEmpty(req.writes()), null, null);
+ Throwable err = topFut.validateCache(ctx, req.recovery(), writesEmpty, null, null);
if (err != null)
onDone(null, new IgniteCheckedException(err));
@@ -1073,7 +1075,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
boolean ser = tx.serializable() && tx.optimistic();
- if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
+ if (!writesEmpty || (ser && !F.isEmpty(req.reads()))) {
Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
for (IgniteTxEntry entry : req.writes())
@@ -1309,7 +1311,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
TxCounters counters = tx.txCounters(true);
// Assign keys to primary nodes.
- if (!F.isEmpty(req.writes())) {
+ if (req.writes() != null) {
for (IgniteTxEntry write : req.writes()) {
IgniteTxEntry entry = tx.entry(write.txKey());
@@ -1326,7 +1328,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
}
- if (!F.isEmpty(req.reads())) {
+ if (req.reads() != null) {
for (IgniteTxEntry read : req.reads())
map(tx.entry(read.txKey()));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 941c7b3..c860bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -277,7 +278,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
*/
private Collection<IgniteTxEntry> cloneEntries(Collection<IgniteTxEntry> c) {
if (F.isEmpty(c))
- return c;
+ return Collections.emptyList();
Collection<IgniteTxEntry> cp = new ArrayList<>(c.size());