You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:46 UTC
[01/10] ignite git commit: ignite-4680-2
Repository: ignite
Updated Branches:
refs/heads/ignite-4680-sb [created] e59edc930
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 5a8904f..1a87ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -41,6 +41,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** Timestamp used as base time for cache topology version (January 1, 2014). */
public static final long TOP_VER_BASE_TIME = 1388520000000L;
+ /** Maximum number of atomic ids for thread. Must be power of two ! */
+ protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
/**
* Current order. Initialize to current time to make sure that
* local version increments even after restarts.
@@ -63,6 +66,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** */
private GridCacheVersion ISOLATED_STREAMER_VER;
+ /** Global atomic id counter. */
+ protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+ /** Per thread atomic id counter. */
+ private final ThreadLocal<LongWrapper> threadAtomicVersionCnt = new ThreadLocal<LongWrapper>() {
+ @Override protected LongWrapper initialValue() {
+ return new LongWrapper(globalAtomicCnt);
+ }
+ };
+
/** */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -304,4 +317,38 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
public GridCacheVersion last() {
return last;
}
+
+ /**
+ * @return Next future Id for atomic futures.
+ */
+ public long nextAtomicFutureVersion() {
+ LongWrapper cnt = threadAtomicVersionCnt.get();
+ return cnt.getNext();
+ }
+
+ /** Long wrapper. */
+ private static class LongWrapper {
+ /** */
+ private long val;
+ private final AtomicLong globalCnt;
+
+ /** */
+ public LongWrapper(AtomicLong globalCnt) {
+ assert THREAD_RESERVE_SIZE > 1 && (THREAD_RESERVE_SIZE & (THREAD_RESERVE_SIZE - 1)) == 0 :
+ "THREAD_RESERVE_SIZE must be power of two";
+
+ this.globalCnt = globalCnt;
+ val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+ }
+
+ /** */
+ public long getNext() {
+ long res = val++;
+
+ if ((val & (THREAD_RESERVE_SIZE - 1)) == 0)
+ val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+
+ return res;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
new file mode 100644
index 0000000..5505b3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
+/**
+ * @param <E>
+ */
+public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+
+ /** */
+ static final int INITIAL_ARRAY_SIZE = 512;
+ /** */
+ static final Node BLOCKED = new Node();
+
+ /** */
+ final AtomicReference<Node> putStack = new AtomicReference<>();
+ /** */
+ private final AtomicInteger takeStackSize = new AtomicInteger();
+
+ /** */
+ private Thread consumerThread;
+ /** */
+ private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
+ /** */
+ private int takeStackIndex = -1;
+
+ /**
+ */
+ public MPSCQueue(Thread consumerThread) {
+ assert consumerThread != null;
+ this.consumerThread = consumerThread;
+ }
+
+ /**
+ */
+ public MPSCQueue() {
+ }
+
+ /**
+ * Sets the consumer thread.
+ *
+ * The consumer thread is needed for blocking, so that an offering known which thread
+ * to wakeup. There can only be a single consumerThread and this method should be called
+ * before the queue is safely published. It will not provide a happens before relation on
+ * its own.
+ *
+ * @param consumerThread the consumer thread.
+ * @throws NullPointerException when consumerThread null.
+ */
+ public void setConsumerThread(Thread consumerThread) {
+ assert consumerThread != null;
+ this.consumerThread = consumerThread;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * This call is threadsafe; but it will only remove the items that are on the put-stack.
+ */
+ @Override public void clear() {
+ putStack.set(BLOCKED);
+ }
+
+ /** {@inheritDoc}. */
+ @Override public boolean offer(E item) {
+ assert item != null : "item can't be null";
+
+ AtomicReference<Node> putStack = this.putStack;
+ Node newHead = new Node();
+ newHead.item = item;
+
+ for (; ; ) {
+ Node oldHead = putStack.get();
+ if (oldHead == null || oldHead == BLOCKED) {
+ newHead.next = null;
+ newHead.size = 1;
+ }
+ else {
+ newHead.next = oldHead;
+ newHead.size = oldHead.size + 1;
+ }
+
+ if (!putStack.compareAndSet(oldHead, newHead))
+ continue;
+
+ if (oldHead == BLOCKED)
+ unpark(consumerThread);
+
+ return true;
+ }
+ }
+
+ /** {@inheritDoc}. */
+ @Override public E peek() {
+ E item = peekNext();
+ if (item != null)
+ return item;
+
+ if (!drainPutStack())
+ return null;
+
+ return peekNext();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public E take() throws InterruptedException {
+ E item = next();
+
+ if (item != null)
+ return item;
+
+ takeAll();
+ assert takeStackIndex == 0;
+ assert takeStack[takeStackIndex] != null;
+
+ return next();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public E poll() {
+ E item = next();
+
+ if (item != null)
+ return item;
+
+ if (!drainPutStack())
+ return null;
+
+ return next();
+ }
+
+ /** */
+ private E next() {
+ E item = peekNext();
+
+ if (item != null)
+ dequeue();
+
+ return item;
+ }
+
+ /** */
+ private E peekNext() {
+ if (takeStackIndex == -1)
+ return null;
+
+ if (takeStackIndex == takeStack.length) {
+ takeStackIndex = -1;
+ return null;
+ }
+
+ E item = (E)takeStack[takeStackIndex];
+
+ if (item == null) {
+ takeStackIndex = -1;
+ return null;
+ }
+
+ return item;
+ }
+
+ /** */
+ private void dequeue() {
+ takeStack[takeStackIndex] = null;
+ takeStackIndex++;
+ takeStackSize.lazySet(takeStackSize.get() - 1);
+ }
+
+ /** */
+ private void takeAll() throws InterruptedException {
+ long iteration = 0;
+ AtomicReference<Node> putStack = this.putStack;
+
+ for (; ; ) {
+ if (consumerThread != null && consumerThread.isInterrupted()) {
+ putStack.compareAndSet(BLOCKED, null);
+ throw new InterruptedException();
+ }
+
+ Node currentPutStackHead = putStack.get();
+
+ if (currentPutStackHead == null) {
+ // there is nothing to be take, so lets block.
+ if (!putStack.compareAndSet(null, BLOCKED)) {
+ // we are lucky, something is available
+ continue;
+ }
+
+ // lets block for real.
+ park();
+ }
+ else if (currentPutStackHead == BLOCKED)
+ park();
+
+ else {
+ if (!putStack.compareAndSet(currentPutStackHead, null))
+ continue;
+
+ copyIntoTakeStack(currentPutStackHead);
+ break;
+ }
+ iteration++;
+ }
+ }
+
+ /** */
+ private boolean drainPutStack() {
+ for (; ; ) {
+ Node head = putStack.get();
+
+ if (head == null)
+ return false;
+
+ if (putStack.compareAndSet(head, null)) {
+ copyIntoTakeStack(head);
+ return true;
+ }
+ }
+ }
+
+ /** */
+ private void copyIntoTakeStack(Node putStackHead) {
+ int putStackSize = putStackHead.size;
+
+ takeStackSize.lazySet(putStackSize);
+
+ if (putStackSize > takeStack.length)
+ takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
+
+ for (int i = putStackSize - 1; i >= 0; i--) {
+ takeStack[i] = putStackHead.item;
+ putStackHead = putStackHead.next;
+ }
+
+ takeStackIndex = 0;
+ assert takeStack[0] != null;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * Best effort implementation.
+ */
+ @Override public int size() {
+ Node h = putStack.get();
+ int putStackSize = h == null ? 0 : h.size;
+ return putStackSize + takeStackSize.get();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /** {@inheritDoc}. */
+ @Override public void put(E e) throws InterruptedException {
+ offer(e);
+ }
+
+ /** {@inheritDoc}. */
+ @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+ add(e);
+ return true;
+ }
+
+ /** {@inheritDoc}. */
+ @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ /** {@inheritDoc}. */
+ @Override public int drainTo(Collection<? super E> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc}. */
+ @Override public Iterator<E> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** */
+ private static int nextPowerOfTwo(final int value) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+ }
+
+ /** */
+ private static final class Node<E> {
+ /** */
+ Node next;
+ /** */
+ E item;
+ /** */
+ int size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6c85b32..af474ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,8 +38,9 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteStripeThread;
import org.jetbrains.annotations.NotNull;
+import org.jsr166.LongAdder8;
/**
* Striped executor.
@@ -75,7 +76,7 @@ public class StripedExecutor implements ExecutorService {
try {
for (int i = 0; i < cnt; i++) {
- stripes[i] = new StripeConcurrentQueue(
+ stripes[i] = new StripeMPSCQueue(
igniteInstanceName,
poolName,
i,
@@ -146,6 +147,31 @@ public class StripedExecutor implements ExecutorService {
}
/**
+ * @return Metrics.
+ */
+ public String getMetrics() {
+ GridStringBuilder sb = new GridStringBuilder();
+ sb.a("completed");
+ GridStringBuilder sb2 = new GridStringBuilder();
+ sb2.a("park");
+ GridStringBuilder sb3 = new GridStringBuilder();
+ sb3.a("unpark");
+ GridStringBuilder sb4 = new GridStringBuilder();
+ sb4.a("queue");
+
+ for (int i = 0; i < stripes.length; i++) {
+ Stripe stripe = stripes[i];
+
+ sb.a(':').a(stripe.completedCnt);
+ sb2.a(':').a(stripe.parkCntr());
+ sb3.a(':').a(stripe.unparkCntr());
+ sb4.a(':').a(stripe.queueSize());
+ }
+
+ return sb.a(' ').a(sb2).a(' ').a(sb3).a(' ').a(sb4).toString();
+ }
+
+ /**
* @return Stripes count.
*/
public int stripes() {
@@ -435,10 +461,9 @@ public class StripedExecutor implements ExecutorService {
* Starts the stripe.
*/
void start() {
- thread = new IgniteThread(igniteInstanceName,
+ thread = new IgniteStripeThread(igniteInstanceName,
poolName + "-stripe-" + idx,
this,
- IgniteThread.GRP_IDX_UNASSIGNED,
idx);
thread.start();
@@ -524,6 +549,20 @@ public class StripedExecutor implements ExecutorService {
*/
abstract String queueToString();
+ /**
+ * @return Number of park ops.
+ */
+ public long parkCntr() {
+ return 0;
+ }
+
+ /**
+ * @return Number of unpark ops.
+ */
+ public long unparkCntr() {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Stripe.class, this);
@@ -540,6 +579,12 @@ public class StripedExecutor implements ExecutorService {
/** */
private volatile boolean parked;
+ /** */
+ private volatile long parkCntr;
+
+ /** */
+ private final LongAdder8 unparkCntr = new LongAdder8();
+
/**
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
@@ -578,6 +623,7 @@ public class StripedExecutor implements ExecutorService {
if (r != null)
return r;
+ parkCntr++;
LockSupport.park();
if (Thread.interrupted())
@@ -593,8 +639,10 @@ public class StripedExecutor implements ExecutorService {
void execute(Runnable cmd) {
queue.add(cmd);
- if (parked)
+ if (parked) {
+ unparkCntr.increment();
LockSupport.unpark(thread);
+ }
}
/** {@inheritDoc} */
@@ -608,6 +656,16 @@ public class StripedExecutor implements ExecutorService {
}
/** {@inheritDoc} */
+ @Override public long parkCntr() {
+ return parkCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long unparkCntr() {
+ return unparkCntr.sum();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(StripeConcurrentQueue.class, this, super.toString());
}
@@ -719,4 +777,61 @@ public class StripedExecutor implements ExecutorService {
return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
}
}
+
+ /**
+ * Stripe.
+ */
+ private static class StripeMPSCQueue extends Stripe {
+ /** Queue. */
+ private final MPSCQueue<Runnable> queue = new MPSCQueue<>();
+
+ /**
+ * @param gridName Grid name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ public StripeMPSCQueue(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ super(gridName,
+ poolName,
+ idx,
+ log);
+ }
+
+ /** {@inheritDoc} */
+ @Override void start() {
+ super.start();
+ queue.setConsumerThread(thread);
+ }
+
+ /** {@inheritDoc} */
+ @Override Runnable take() throws InterruptedException {
+ return queue.take();
+ }
+
+ /** {@inheritDoc} */
+ void execute(Runnable cmd) {
+ queue.add(cmd);
+ }
+
+ /** {@inheritDoc} */
+ @Override int queueSize() {
+ return queue.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override String queueToString() {
+ return String.valueOf(queue);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StripeMPSCQueue.class, this, super.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
new file mode 100644
index 0000000..75b655f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class for use within {@link IgniteThreadPoolExecutor} class.
+ */
+public class IgniteStripeThread extends IgniteThread {
+
+ /** Group index. */
+ private final int stripeIdx;
+
+ /** {@inheritDoc} */
+ public IgniteStripeThread(String gridName, String threadName, Runnable r, int stripeIdx) {
+ super(gridName, threadName, r);
+ this.stripeIdx = stripeIdx;
+ }
+
+ /**
+ * @return Group index.
+ */
+ public int stripeIndex() {
+ return stripeIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteStripeThread.class, this, "name", getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
new file mode 100644
index 0000000..d6e7537
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Trivial GridNearAtomicFullUpdateRequest tests.
+ */
+public class GridNearAtomicFullUpdateRequestTest extends MarshallingAbstractTest {
+ /**
+ * Message marshalling test.
+ *
+ * @throws IgniteCheckedException If fails.
+ */
+ public void testMarshall() throws IgniteCheckedException {
+ GridCacheVersion updVer = new GridCacheVersion(1, 2, 3, 5);
+
+ int entryNum = 3;
+
+ GridNearAtomicFullUpdateRequest msg = new GridNearAtomicFullUpdateRequest(555,
+ UUID.randomUUID(),
+ 555L,
+ new AffinityTopologyVersion(25, 5),
+ false,
+ CacheWriteSynchronizationMode.PRIMARY_SYNC,
+ GridCacheOperation.UPDATE,
+ false,
+ null,
+ null,
+ null,
+ null,
+ 555,
+ false,
+ false,
+ true,
+ false,
+ 3,
+ entryNum);
+
+ for (int i = 0; i < entryNum; i++)
+ msg.addUpdateEntry(
+ key(i, i),
+ val(i),
+ -1,
+ -1,
+ null
+ );
+
+ GridNearAtomicFullUpdateRequest received = marshalUnmarshal(msg);
+
+ assertEquals(555, received.futureId());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
new file mode 100644
index 0000000..f53a32b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryContext;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public abstract class MarshallingAbstractTest extends TestCase {
+ /** */
+ private static final byte proto = 2;
+
+ /** */
+ private GridCacheSharedContext ctx;
+
+ /** */
+ private GridCacheContext cctx;
+
+ /** */
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+ ctx = mock(GridCacheSharedContext.class);
+ cctx = mock(GridCacheContext.class);
+
+ CacheObjectBinaryContext coctx = mock(CacheObjectBinaryContext.class);
+ GridKernalContext kctx = mock(GridKernalContext.class);
+ IgniteEx ignite = mock(IgniteEx.class);
+ GridCacheProcessor proc = mock(GridCacheProcessor.class);
+ GridCacheSharedContext shared = mock(GridCacheSharedContext.class);
+
+ Marshaller marsh = new BinaryMarshaller();
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+ cfg.setMarshaller(marsh);
+
+ when(ctx.cacheContext(anyInt())).thenReturn(cctx);
+ when(ctx.marshaller()).thenReturn(marsh);
+ when(ctx.gridConfig()).thenReturn(cfg);
+
+ when(cctx.cacheObjectContext()).thenReturn(coctx);
+ when(cctx.gridConfig()).thenReturn(cfg);
+ when(cctx.shared()).thenReturn(shared);
+
+ when(cctx.grid()).thenReturn(ignite);
+ when(kctx.grid()).thenReturn(ignite);
+
+ when(ignite.configuration()).thenReturn(cfg);
+
+ when(ctx.kernalContext()).thenReturn(kctx);
+ when(cctx.kernalContext()).thenReturn(kctx);
+ when(coctx.kernalContext()).thenReturn(kctx);
+
+ when(coctx.binaryEnabled()).thenReturn(true);
+
+ when(kctx.cache()).thenReturn(proc);
+ when(kctx.config()).thenReturn(cfg);
+
+ when(proc.context()).thenReturn(ctx);
+
+ IgniteCacheObjectProcessor binaryProcessor = new CacheObjectBinaryProcessorImpl(kctx);
+ when(kctx.cacheObjects()).thenReturn(binaryProcessor);
+
+ // init marshaller
+ marsh.setContext(new MarshallerContextTestImpl());
+
+ when(shared.marshaller()).thenReturn(marsh);
+
+ BinaryContext bctx =
+ new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), new NullLogger());
+
+ IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", bctx, new IgniteConfiguration());
+ }
+
+ /**
+ * @param m Message.
+ * @return Unmarshalled message.
+ */
+ protected <T extends GridCacheMessage> T marshalUnmarshal(T m) throws IgniteCheckedException {
+ ByteBuffer buf = ByteBuffer.allocate(64 * 1024);
+
+ m.prepareMarshal(ctx);
+ m.writeTo(buf, new DirectMessageWriter(proto));
+
+ System.out.println("Binary size: " + buf.position() + " bytes");
+ buf.flip();
+
+ byte type = buf.get();
+ assertEquals(m.directType(), type);
+
+ MessageFactory msgFactory = new GridIoMessageFactory(null);
+
+ Message mx = msgFactory.create(type);
+ mx.readFrom(buf, new DirectMessageReader(msgFactory, proto));
+ ((GridCacheMessage)mx).finishUnmarshal(ctx, U.gridClassLoader());
+
+ return (T)mx;
+ }
+
+ /** */
+ protected KeyCacheObject key(Object val, int part) {
+ return new KeyCacheObjectImpl(val, null, part);
+ }
+
+ protected CacheObject val(Object val) {
+ return new CacheObjectImpl(val, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
new file mode 100644
index 0000000..f555750
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.TestCase;
+
+/**
+ * Simple tests for {@link GridCacheVersionManager}.
+ */
+public class GridCacheVersionManagerTest extends TestCase {
+ /**
+ * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()}.
+ */
+ public void testNextAtomicIdMonotonicalGrows() {
+ GridCacheVersionManager mgr = new GridCacheVersionManager();
+ int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+
+ for (int i = 0; i < n * 3 + 5; i++) {
+ long l = mgr.nextAtomicFutureVersion();
+ assertEquals(i, l);
+ }
+
+ assertEquals(n * 4, mgr.globalAtomicCnt.get());
+ }
+
+ /**
+ * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()} with multiple threads.
+ *
+ * @throws InterruptedException if fails.
+ */
+ public void testNextAtomicMultiThread() throws InterruptedException {
+ int threadsNum = 3;
+
+ final GridCacheVersionManager mgr = new GridCacheVersionManager();
+ final int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+ final int perThreadNum = n * 2 + 10;
+
+ final int[] vals = new int[n * threadsNum * (perThreadNum / n + 1)];
+
+ ExecutorService executorService = Executors.newFixedThreadPool(threadsNum);
+
+ final CountDownLatch latch = new CountDownLatch(threadsNum);
+
+ for (int i = 0; i < threadsNum; i++) {
+ executorService.submit(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < perThreadNum; i++) {
+ long l = mgr.nextAtomicFutureVersion();
+ vals[(int)l]++;
+ }
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ executorService.shutdown();
+
+ int cnt = 0;
+
+ for (int i = 0; i < vals.length; i++) {
+ if (vals[i] > 0)
+ cnt++;
+ }
+
+ assertEquals(threadsNum * perThreadNum, cnt);
+ }
+}
[09/10] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-4680-tmp
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4680-tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae9737b4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae9737b4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae9737b4
Branch: refs/heads/ignite-4680-sb
Commit: ae9737b4293479a976bd11fe7169a2e78f0dd1e0
Parents: 3200c2e 9020d12
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 17:50:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:50:18 2017 +0300
----------------------------------------------------------------------
.../ClientAbstractMultiNodeSelfTest.java | 13 +-
.../ignite/internal/IgniteTransactionsEx.java | 8 +-
.../processors/cache/GridCacheAdapter.java | 98 +-
.../processors/cache/GridCacheProxyImpl.java | 6 +-
.../cache/GridCacheSharedContext.java | 11 +-
.../processors/cache/GridCacheUtils.java | 6 +-
.../processors/cache/IgniteInternalCache.java | 5 +-
.../distributed/GridCacheCommittedTxInfo.java | 117 -
.../GridDistributedCacheAdapter.java | 2 +-
.../GridDistributedTxRemoteAdapter.java | 59 +-
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 57 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 126 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 28 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 65 +-
.../dht/colocated/GridDhtColocatedCache.java | 8 +-
.../colocated/GridDhtColocatedLockFuture.java | 7 +-
.../distributed/near/GridNearLockFuture.java | 4 +-
.../distributed/near/GridNearLockRequest.java | 200 +-
.../near/GridNearTransactionalCache.java | 6 +-
.../near/GridNearTxFinishFuture.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 2732 ++++++++++++++++-
.../near/GridNearTxPrepareFutureAdapter.java | 5 +-
.../near/GridNearTxPrepareRequest.java | 2 +-
.../distributed/near/GridNearTxRemote.java | 4 +-
.../store/GridCacheStoreManagerAdapter.java | 142 +-
.../cache/transactions/IgniteInternalTx.java | 80 +-
.../transactions/IgniteTransactionsImpl.java | 12 +-
.../cache/transactions/IgniteTxAdapter.java | 165 +-
.../cache/transactions/IgniteTxEntry.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 67 +-
.../IgniteTxImplicitSingleStateImpl.java | 4 +-
.../transactions/IgniteTxLocalAdapter.java | 2801 ++----------------
.../cache/transactions/IgniteTxLocalEx.java | 145 +-
.../cache/transactions/IgniteTxManager.java | 293 +-
.../cache/transactions/IgniteTxRemoteEx.java | 11 +
.../IgniteTxRemoteStateAdapter.java | 2 +-
.../cache/transactions/IgniteTxState.java | 2 +-
.../cache/transactions/IgniteTxStateImpl.java | 4 +-
.../transactions/TransactionProxyImpl.java | 13 +-
.../datastructures/DataStructuresProcessor.java | 32 +-
.../datastructures/GridCacheAtomicLongImpl.java | 18 +-
.../GridCacheAtomicReferenceImpl.java | 6 +-
.../GridCacheAtomicSequenceImpl.java | 4 +-
.../GridCacheAtomicStampedImpl.java | 6 +-
.../GridCacheCountDownLatchImpl.java | 6 +-
.../datastructures/GridCacheLockImpl.java | 11 +-
.../datastructures/GridCacheSemaphoreImpl.java | 18 +-
.../GridTransactionalCacheQueueImpl.java | 10 +-
.../processors/igfs/IgfsDataManager.java | 55 +-
.../processors/igfs/IgfsMetaManager.java | 73 +-
.../service/GridServiceProcessor.java | 6 +-
.../internal/TestRecordingCommunicationSpi.java | 29 +
.../processors/cache/GridCacheTestStore.java | 2 -
.../cache/IgniteTxConfigCacheSelfTest.java | 4 +-
.../IgniteCacheSystemTransactionsSelfTest.java | 7 +-
.../IgniteTxCachePrimarySyncTest.java | 5 +
...xOriginatingNodeFailureAbstractSelfTest.java | 6 +-
...cOriginatingNodeFailureAbstractSelfTest.java | 4 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 9 +-
.../dht/IgniteCacheTxRecoveryRollbackTest.java | 501 ++++
.../GridCachePartitionedTxSalvageSelfTest.java | 7 +-
.../TxOptimisticDeadlockDetectionTest.java | 30 +-
...lockMessageSystemPoolStarvationSelfTest.java | 6 +-
.../IgniteCacheRestartTestSuite2.java | 3 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 3 +
.../apache/ignite/util/GridRandomSelfTest.java | 17 -
.../HibernateReadWriteAccessStrategy.java | 12 +-
.../processors/cache/jta/CacheJtaManager.java | 3 +-
.../processors/cache/jta/CacheJtaResource.java | 8 +-
.../Plugin/PluginTest.cs | 24 +
.../Impl/Plugin/PluginContext.cs | 9 +
.../Apache.Ignite.Core/Plugin/IPluginContext.cs | 8 +
73 files changed, 4191 insertions(+), 4073 deletions(-)
----------------------------------------------------------------------
[03/10] ignite git commit: ignite-4680-2 wip
Posted by sb...@apache.org.
ignite-4680-2 wip
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8fa6dd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8fa6dd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8fa6dd4
Branch: refs/heads/ignite-4680-sb
Commit: d8fa6dd4fae51532a4e04ffbfe6c6fa3584520bf
Parents: d1b4ebd
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 20:44:46 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 20:44:46 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 14 +++---
.../ignite/internal/util/StripedExecutor.java | 5 ++-
.../ignite/thread/IgniteStripeThread.java | 47 --------------------
3 files changed, 10 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4ab5000..aeb379a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,7 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
-import org.apache.ignite.thread.IgniteStripeThread;
+import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -280,11 +280,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateRequest req
) {
int stripeIdx;
-
- if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
- stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+ Thread curTrd = Thread.currentThread();
+ if (req instanceof GridNearAtomicFullUpdateRequest && curTrd instanceof IgniteThread)
+ stripeIdx = ((IgniteThread)curTrd).stripe();
else
- stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+ stripeIdx = IgniteThread.GRP_IDX_UNASSIGNED;
processNearAtomicUpdateRequest(
nodeId,
@@ -1660,7 +1660,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicAbstractUpdateRequest req,
final UpdateReplyClosure completionCb
) {
- updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+ updateAllAsyncInternal(nodeId, req, IgniteThread.GRP_IDX_UNASSIGNED, completionCb);
}
/**
@@ -1772,7 +1772,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int[] stripeIdxs = null;
- if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+ if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
&& req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
&& ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index af474ff..28bb584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,7 +38,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteStripeThread;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jsr166.LongAdder8;
@@ -461,9 +461,10 @@ public class StripedExecutor implements ExecutorService {
* Starts the stripe.
*/
void start() {
- thread = new IgniteStripeThread(igniteInstanceName,
+ thread = new IgniteThread(igniteInstanceName,
poolName + "-stripe-" + idx,
this,
+ IgniteThread.GRP_IDX_UNASSIGNED,
idx);
thread.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
deleted file mode 100644
index 75b655f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.thread;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Class for use within {@link IgniteThreadPoolExecutor} class.
- */
-public class IgniteStripeThread extends IgniteThread {
-
- /** Group index. */
- private final int stripeIdx;
-
- /** {@inheritDoc} */
- public IgniteStripeThread(String gridName, String threadName, Runnable r, int stripeIdx) {
- super(gridName, threadName, r);
- this.stripeIdx = stripeIdx;
- }
-
- /**
- * @return Group index.
- */
- public int stripeIndex() {
- return stripeIdx;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteStripeThread.class, this, "name", getName());
- }
-}
[05/10] ignite git commit: wip
Posted by sb...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/855c66b1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/855c66b1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/855c66b1
Branch: refs/heads/ignite-4680-sb
Commit: 855c66b11b8eded1f3ff3e4ff1df587c724c5c70
Parents: f33235d
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 13:44:38 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 13:44:38 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 6 ++-
.../dht/atomic/NearAtomicResponseHelper.java | 55 +++++++++-----------
2 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/855c66b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 05d85ad..e68d72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -214,8 +214,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (req.writeSynchronizationMode() != FULL_ASYNC) {
if (req.responseHelper() != null) {
- if (req.responseHelper().addResponse(res))
- sendNearUpdateReply(res.nodeId(), req.responseHelper().response());
+ GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+ if (res0 != null)
+ sendNearUpdateReply(res.nodeId(), res0);
}
else
sendNearUpdateReply(res.nodeId(), res);
http://git-wip-us.apache.org/repos/asf/ignite/blob/855c66b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 9e35e8f..00c9f6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -42,45 +42,42 @@ public class NearAtomicResponseHelper {
* @param res Response.
* @return {@code true} if all responses added.
*/
- public boolean addResponse(GridNearAtomicUpdateResponse res) {
+ public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
synchronized (this) {
- if (res.stripe() == -1) {
- this.res = res;
- this.res.stripe(-1);
-
- return true;
- }
+ if (res.stripe() == -1)
+ return res;
if (stripes.remove(res.stripe())) {
- if (this.res == null)
- this.res = res;
- else {
- if (res.nearValuesIndexes() != null)
- for (int i = 0; i < res.nearValuesIndexes().size(); i++)
- this.res.addNearValue(
- res.nearValuesIndexes().get(i),
- res.nearValue(i),
- res.nearTtl(i),
- res.nearExpireTime(i)
- );
+ mergeResponse(res);
- if (res.failedKeys() != null)
- this.res.addFailedKeys(res.failedKeys(), null);
-
- if (res.skippedIndexes() != null)
- this.res.skippedIndexes().addAll(res.skippedIndexes());
- }
- return stripes.isEmpty();
+ return stripes.isEmpty() ? this.res : null;
}
- return false;
+ return null;
}
}
/**
- * @return Response.
+ * @param res Response.
*/
- public GridNearAtomicUpdateResponse response() {
- return res;
+ private void mergeResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null)
+ this.res = res;
+ else {
+ if (res.nearValuesIndexes() != null)
+ for (int i = 0; i < res.nearValuesIndexes().size(); i++)
+ this.res.addNearValue(
+ res.nearValuesIndexes().get(i),
+ res.nearValue(i),
+ res.nearTtl(i),
+ res.nearExpireTime(i)
+ );
+
+ if (res.failedKeys() != null)
+ this.res.addFailedKeys(res.failedKeys(), null);
+
+ if (res.skippedIndexes() != null)
+ this.res.skippedIndexes().addAll(res.skippedIndexes());
+ }
}
}
[04/10] ignite git commit: ignite-4680 one-message-back
Posted by sb...@apache.org.
ignite-4680 one-message-back
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f33235db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f33235db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f33235db
Branch: refs/heads/ignite-4680-sb
Commit: f33235dbda1b3081468b2d00a5337d27f77aa930
Parents: d8fa6dd
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 13:10:06 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 13:10:06 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 5 ++
.../dht/atomic/GridDhtAtomicCache.java | 10 ++-
.../GridNearAtomicAbstractUpdateFuture.java | 62 ++++----------
.../GridNearAtomicAbstractUpdateRequest.java | 19 +++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 18 ++--
.../atomic/GridNearAtomicUpdateResponse.java | 17 +---
.../dht/atomic/NearAtomicResponseHelper.java | 86 ++++++++++++++++++++
7 files changed, 140 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 5fd2845..39c514b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -829,6 +830,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
if (stripemap != null) {
+ GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+
+ msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+
for (Integer stripe : stripemap.keySet()) {
stripedExecutor.execute(stripe, c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aeb379a..05d85ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -212,8 +212,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new UpdateReplyClosure() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (req.writeSynchronizationMode() != FULL_ASYNC)
- sendNearUpdateReply(res.nodeId(), res);
+ if (req.writeSynchronizationMode() != FULL_ASYNC) {
+ if (req.responseHelper() != null) {
+ if (req.responseHelper().addResponse(res))
+ sendNearUpdateReply(res.nodeId(), req.responseHelper().response());
+ }
+ else
+ sendNearUpdateReply(res.nodeId(), res);
+ }
else {
if (res.remapTopologyVersion() != null)
// Remap keys on primary node in FULL_ASYNC mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 770999a..2d02795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -357,6 +356,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return Response to notify about primary failure.
*/
final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
+// assert req == null : req;
assert req.nodeId() != null : req;
if (msgLog.isDebugEnabled()) {
@@ -426,9 +426,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
private GridNearAtomicUpdateResponse res;
/** */
- private Map<Integer, GridNearAtomicUpdateResponse> resMap;
-
- /** */
@GridToStringInclude
Set<UUID> dhtNodes;
@@ -437,7 +434,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
private Set<UUID> rcvd;
/** */
- private boolean hasDhtRes;
+ private boolean hasRes;
/**
* @param req Request.
@@ -522,24 +519,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
- * @return {@code True} if all near results gathered.
- */
- private boolean hasAllNearResults() {
- return res != null || (resMap != null && resMap.size() == req.stripes());
- }
-
- public String state() {
- return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
- }
-
- /**
* @return {@code True} if all expected responses are received.
*/
private boolean finished() {
if (req.writeSynchronizationMode() == PRIMARY_SYNC)
- return hasAllNearResults();
+ return hasRes;
- return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
+ return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
}
/**
@@ -559,7 +545,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return req;
}
- return hasAllNearResults() ? req : null;
+ return this.res == null ? req : null;
}
/**
@@ -576,7 +562,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (finished())
return null;
- return !hasAllNearResults() ? req : null;
+ return this.res == null ? req : null;
}
/**
@@ -588,7 +574,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return DhtLeftResult.NOT_DONE;
if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
- if (hasDhtRes)
+ if (hasRes)
return DhtLeftResult.DONE;
else
return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -609,7 +595,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return false;
if (res.hasResult())
- hasDhtRes = true;
+ hasRes = true;
if (dhtNodes == null) {
if (rcvd == null)
@@ -631,6 +617,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
assert !finished() : this;
+ hasRes = true;
+
boolean onRes = storeResponse(res);
assert onRes;
@@ -654,7 +642,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param cctx Context.
*/
private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
- assert F.isEmpty(dhtNodes) || req.initMappingLocally();
+ assert dhtNodes == null || req.initMappingLocally();
Set<UUID> dhtNodes0 = dhtNodes;
@@ -691,22 +679,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return {@code True} if current response was {@code null}.
*/
private boolean storeResponse(GridNearAtomicUpdateResponse res) {
- if (res.stripe() > -1) {
- if (resMap == null)
- resMap = U.newHashMap(req.stripes());
+ if (this.res == null) {
+ this.res = res;
- if (!resMap.containsKey(res.stripe())) {
- resMap.put(res.stripe(), res);
- return true;
- }
- }
- else {
- if (this.res == null) {
- this.res = res;
-// this.resMap = null;
-
- return true;
- }
+ return true;
}
return false;
@@ -717,7 +693,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
*/
void resetResponse() {
res = null;
- resMap = null;
}
/**
@@ -727,19 +702,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return res;
}
- /**
- * @return Response.
- */
- @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
- return resMap;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PrimaryRequestState.class, this,
"primary", primaryId(),
"needPrimaryRes", req.needPrimaryResponse(),
- "primaryRes", this.res != null,
+ "primaryRes", res != null,
"done", finished());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index b549370..0748434 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,6 +89,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridToStringExclude
protected byte flags;
+ /** Response helper. */
+ @GridDirectTransient
+ private NearAtomicResponseHelper responseHelper;
+
/**
*
*/
@@ -419,6 +423,21 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
+ * @return Response helper.
+ */
+ public NearAtomicResponseHelper responseHelper() {
+ return responseHelper;
+ }
+
+ /**
+ * @param responseHelper Response helper.
+ */
+ public void responseHelper(
+ NearAtomicResponseHelper responseHelper) {
+ this.responseHelper = responseHelper;
+ }
+
+ /**
* @param idx Key index.
* @return Key.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9c4de66..8774d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -371,7 +371,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- boolean rcvAll = false;
+ boolean rcvAll;
synchronized (mux) {
if (futId == -1 || futId != res.futureId())
@@ -412,8 +412,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (msgLog.isDebugEnabled())
msgLog.debug("Processed near atomic update response " +
"[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
- ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
- " " + reqState.state() + ']');
+ ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") + ']');
}
else
return;
@@ -472,18 +471,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
for (PrimaryRequestState reqState : mappings.values()) {
- if (reqState.responses() != null) {
- for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
- updateNear(reqState.req, res0);
- }
- }
- else {
- GridNearAtomicUpdateResponse res0 = reqState.response();
+ GridNearAtomicUpdateResponse res0 = reqState.response();
- assert res0 != null : reqState;
+ assert res0 != null : reqState;
- updateNear(reqState.req, res0);
- }
+ updateNear(reqState.req, res0);
}
}
else if (!nodeErr)
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index f8b45d3..f0f954c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -97,6 +97,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
private int partId = -1;
/** Stripe. */
+ @GridDirectTransient
private int stripe = -1;
/** */
@@ -548,12 +549,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
- case 15:
- if (!writer.writeInt("stripe", stripe))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -666,14 +661,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
- case 15:
- stripe = reader.readInt("stripe");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -686,7 +673,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 16;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
new file mode 100644
index 0000000..9e35e8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class NearAtomicResponseHelper {
+
+ /** */
+ private GridNearAtomicUpdateResponse res;
+
+ /** */
+ private Set<Integer> stripes;
+
+ /**
+ * @param stripes Stripes collection.
+ */
+ public NearAtomicResponseHelper(Set<Integer> stripes) {
+ this.stripes = new HashSet<>(stripes);
+ }
+
+ /**
+ * @param res Response.
+ * @return {@code true} if all responses added.
+ */
+ public boolean addResponse(GridNearAtomicUpdateResponse res) {
+ synchronized (this) {
+ if (res.stripe() == -1) {
+ this.res = res;
+ this.res.stripe(-1);
+
+ return true;
+ }
+
+ if (stripes.remove(res.stripe())) {
+ if (this.res == null)
+ this.res = res;
+ else {
+ if (res.nearValuesIndexes() != null)
+ for (int i = 0; i < res.nearValuesIndexes().size(); i++)
+ this.res.addNearValue(
+ res.nearValuesIndexes().get(i),
+ res.nearValue(i),
+ res.nearTtl(i),
+ res.nearExpireTime(i)
+ );
+
+ if (res.failedKeys() != null)
+ this.res.addFailedKeys(res.failedKeys(), null);
+
+ if (res.skippedIndexes() != null)
+ this.res.skippedIndexes().addAll(res.skippedIndexes());
+ }
+ return stripes.isEmpty();
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @return Response.
+ */
+ public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+}
[08/10] ignite git commit: tmp
Posted by sb...@apache.org.
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3200c2e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3200c2e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3200c2e4
Branch: refs/heads/ignite-4680-sb
Commit: 3200c2e43a9b2cee1d35a19f749de50f19f5d0d4
Parents: 5f51839
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 16:59:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:46:01 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 36 +--
.../dht/atomic/GridDhtAtomicCache.java | 265 +++++++++----------
.../dht/atomic/NearAtomicResponseHelper.java | 29 +-
3 files changed, 151 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6dad30b..17ae595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -856,23 +856,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// }
- if (plc == GridIoPolicy.SYSTEM_POOL &&
- (msg.partition() != Integer.MIN_VALUE ||
- msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
- Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
- ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
-
- if (stripemap != null) {
- GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
-
- msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
-
- for (Integer stripe : stripemap.keySet()) {
- stripedExecutor.execute(stripe, c);
- }
- }
- else
- stripedExecutor.execute(msg.partition(), c);
+ if (plc == GridIoPolicy.SYSTEM_POOL && (msg.partition() != Integer.MIN_VALUE)) {
+// Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+// ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+//
+// if (stripemap != null) {
+// GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+//
+// msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+//
+// for (Integer stripe : stripemap.keySet()) {
+// stripedExecutor.execute(stripe, c);
+// }
+// }
+// else
+
+// if (msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)
+// stripedExecutor.execute(((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap().keySet().iterator().next(), c);
+// else
+ stripedExecutor.execute(msg.partition(), c);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dcc79d0..bcfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -213,14 +213,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (req.writeSynchronizationMode() != FULL_ASYNC) {
- if (req.responseHelper() != null) {
- GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
-
- if (res0 != null)
- sendNearUpdateReply(res.nodeId(), res0);
- }
- else
- sendNearUpdateReply(res.nodeId(), res);
+ sendNearUpdateReply(res.nodeId(), res);
}
else {
if (res.remapTopologyVersion() != null)
@@ -1684,22 +1677,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int stripeIdx,
final UpdateReplyClosure completionCb
) {
- IgniteInternalFuture<Object> forceFut;
-
- if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
- && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && req.stripeMap() != null) {
- int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+// if (true) {
+// updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
+//
+// return;
+// }
- List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
-
- for (int i = 0; i < stripeIdxs.length; i++)
- keys.add(req.key(stripeIdxs[i]));
-
- forceFut = preldr.request(keys, req.topologyVersion());
- }
- else
- forceFut = preldr.request(req, req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
if (forceFut == null || forceFut.isDone()) {
try {
@@ -1715,9 +1699,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
+ updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
}
else {
+ if (true)
+ throw new RuntimeException("error");
+
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
@@ -1761,9 +1748,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- private GridCacheVersion ver;
-
-
/**
* Executes local update after preloader fetched values.
*
@@ -1774,11 +1758,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void updateAllAsyncInternal0(
UUID nodeId,
- GridNearAtomicAbstractUpdateRequest req,
+ final GridNearAtomicAbstractUpdateRequest req,
int stripeIdx,
- UpdateReplyClosure completionCb
+ final UpdateReplyClosure completionCb
) {
- ClusterNode node = ctx.discovery().node(nodeId);
+ final ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
U.warn(msgLog, "Skip near update request, node originated update request left [" +
@@ -1787,25 +1771,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
- nodeId,
- req.futureId(),
- req.partition(),
- false,
- ctx.deploymentEnabled());
-
- res.partition(req.partition());
-
- int[] stripeIdxs = null;
-
- if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
- && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && req.stripeMap() != null) {
- stripeIdxs = req.stripeMap().get(stripeIdx);
-
- res.stripe(stripeIdx);
- }
-
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1830,103 +1795,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (top.stopping()) {
- addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
- "(cache is stopped): " + name()));
-
- completionCb.apply(req, res);
return;
}
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
- locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
-
- boolean hasNear = ctx.discovery().cacheNearNode(node, name());
-
- // Assign next version for update inside entries lock.
- if (ver == null)
- ver = ctx.versions().next(top.topologyVersion());
-
- if (hasNear)
- res.nearVersion(ver);
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
- }
-
- assert ver != null : "Got null version for update request: " + req;
-
- boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
-
- int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
-
- dhtFut = null;//createDhtFuture(ver, req, size);
-
- expiry = expiryPolicy(req.expiry());
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ Map<Integer, int[]> stripemap = req.stripeMap();
- GridCacheReturn retVal = null;
+ final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null, req, req.size());
- if (size > 1 && // Several keys ...
- writeThrough() && !req.skipStore() && // and store is enabled ...
- !ctx.store().isLocal() && // and this is not local store ...
- // (conflict resolver should be used for local store)
- !ctx.dr().receiveEnabled() // and no DR.
- ) {
- // This method can only be used when there are no replicated entries in the batch.
- UpdateBatchResult updRes = updateWithBatch(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal,
- stripeIdxs);
-
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
-
- if (req.operation() == TRANSFORM)
- retVal = updRes.invokeResults();
- }
- else {
- UpdateSingleResult updRes = updateSingle(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal,
- stripeIdxs);
+ ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size()));
- retVal = updRes.returnValue();
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
+ for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
+ if (stripeIdx == e.getKey())
+ update(fut, node, req, e.getValue(), completionCb);
+ else {
+ ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
+ @Override public void run() {
+ try {
+ update(fut, node, req, e.getValue(), completionCb);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
}
-
- if (retVal == null)
- retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
-
- res.returnValue(retVal);
-
- if (dhtFut != null)
- ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
}
else {
// Should remap all keys.
remap = true;
- res.remapTopologyVersion(top.topologyVersion());
+ //res.remapTopologyVersion(top.topologyVersion());
}
}
finally {
@@ -1958,16 +1861,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
- res.remapTopologyVersion(ctx.topology().topologyVersion());
+ //res.remapTopologyVersion(ctx.topology().topologyVersion());
}
catch (Throwable e) {
// At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
// an attempt to use cleaned resources.
U.error(log, "Unexpected exception during cache update", e);
- addAllKeysAsFailed(req, res, stripeIdxs, e);
+ //addAllKeysAsFailed(req, res, stripeIdxs, e);
- completionCb.apply(req, res);
+ //completionCb.apply(req, res);
if (e instanceof Error)
throw e;
@@ -1975,23 +1878,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- if (remap) {
- assert dhtFut == null;
- res.stripe(-1);
+// if (remap) {
+// assert dhtFut == null;
+// res.stripe(-1);
+//
+// completionCb.apply(req, res);
+// }
+// else {
+// if (dhtFut != null)
+// dhtFut.map(node, res.returnValue(), res, completionCb);
+// else
+// completionCb.apply(req, res);
+// }
+//
+// if (req.writeSynchronizationMode() != FULL_ASYNC)
+// req.cleanup(!node.isLocal());
+//
+// sendTtlUpdateRequest(expiry);
+ }
+
+ private void update(
+ GridDhtAtomicAbstractUpdateFuture fut,
+ ClusterNode node,
+ GridNearAtomicAbstractUpdateRequest req,
+ int[] stripeIdxs,
+ UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ node.id(),
+ req.futureId(),
+ req.partition(),
+ false,
+ ctx.deploymentEnabled());
- completionCb.apply(req, res);
- }
- else {
- if (dhtFut != null)
- dhtFut.map(node, res.returnValue(), res, completionCb);
- else
- completionCb.apply(req, res);
+ List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
+
+ boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+ // Assign next version for update inside entries lock.
+ //if (ver == null)
+ GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion());
+
+ if (hasNear)
+ res.nearVersion(ver);
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
}
- if (req.writeSynchronizationMode() != FULL_ASYNC)
- req.cleanup(!node.isLocal());
+ assert ver != null : "Got null version for update request: " + req;
+
+ boolean sndPrevVal = false;//!top.rebalanceFinished(req.topologyVersion());
+
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ GridCacheReturn retVal = null;
+
+ UpdateSingleResult updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ null,
+ ctx.isDrEnabled(),
+ null,
+ null,
+ sndPrevVal,
+ stripeIdxs);
+
+ retVal = updRes.returnValue();
+
+ if (retVal == null)
+ retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
- sendTtlUpdateRequest(expiry);
+ res.returnValue(retVal);
+
+ unlockEntries(locked, null);
+
+ GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+ if (res0 != null) {
+ fut.onDone();
+
+ completionCb.apply(req, res);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 00c9f6c..55c450c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
*
@@ -28,14 +27,16 @@ public class NearAtomicResponseHelper {
/** */
private GridNearAtomicUpdateResponse res;
+ private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
+ AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
+
/** */
- private Set<Integer> stripes;
+ private volatile int cnt;
/**
- * @param stripes Stripes collection.
*/
- public NearAtomicResponseHelper(Set<Integer> stripes) {
- this.stripes = new HashSet<>(stripes);
+ public NearAtomicResponseHelper(int cnt) {
+ this.cnt = cnt;
}
/**
@@ -43,18 +44,16 @@ public class NearAtomicResponseHelper {
* @return {@code true} if all responses added.
*/
public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
- synchronized (this) {
- if (res.stripe() == -1)
- return res;
+ this.res = res;
- if (stripes.remove(res.stripe())) {
- mergeResponse(res);
+ int c = UPD.decrementAndGet(this);
- return stripes.isEmpty() ? this.res : null;
- }
+ //mergeResponse(res);
- return null;
- }
+ if (c == 0)
+ return this.res;
+
+ return null;
}
/**
[06/10] ignite git commit: ignite-4680 wip
Posted by sb...@apache.org.
ignite-4680 wip
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e7ee08a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e7ee08a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e7ee08a
Branch: refs/heads/ignite-4680-sb
Commit: 3e7ee08a486a7fcdd080c0807433abe22c3f171b
Parents: 855c66b
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 15:02:55 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 15:02:55 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 32 ++++++++++++++++----
.../GridNearAtomicAbstractUpdateRequest.java | 8 +++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 6 ++--
.../distributed/near/GridNearAtomicCache.java | 5 ++-
4 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e68d72d..973256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1684,7 +1684,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int stripeIdx,
final UpdateReplyClosure completionCb
) {
- IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut;
+
+ if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
+ && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+ && req.stripeMap() != null) {
+ int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+
+ List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
+
+ for (int i = 0; i < stripeIdxs.length; i++)
+ keys.add(req.key(stripeIdxs[i]));
+
+ forceFut = preldr.request(keys, req.topologyVersion());
+ }
+ else
+ forceFut = preldr.request(req, req.topologyVersion());
if (forceFut == null || forceFut.isDone()) {
try {
@@ -1782,8 +1797,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
&& req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
- && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
- stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+ && req.stripeMap() != null) {
+ stripeIdxs = req.stripeMap().get(stripeIdx);
res.stripe(stripeIdx);
}
@@ -1958,6 +1973,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (remap) {
assert dhtFut == null;
+ res.stripe(-1);
completionCb.apply(req, res);
}
@@ -2888,8 +2904,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer,
int[] stripeIdxs)
throws GridDhtInvalidPartitionException {
- if (req.size() == 1) {
- KeyCacheObject key = req.key(0);
+
+ int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ if (keysNum == 1) {
+ int idx = stripeIdxs != null ? stripeIdxs[0] : 0;
+
+ KeyCacheObject key = req.key(idx);
while (true) {
GridDhtCacheEntry entry = entryExx(key, topVer);
@@ -2903,7 +2924,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
- int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
while (true) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 0748434..c8e904d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
@@ -443,6 +444,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
*/
public abstract KeyCacheObject key(int idx);
+ /**
+ * @return Stripe map.
+ */
+ @Nullable public Map<Integer, int[]> stripeMap() {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 10;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 5a8c66b..2e619ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -404,10 +404,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return expiryPlc;
}
- /**
- * @return Stripe mapping.
- */
- @Nullable public Map<Integer, int[]> stripeMap() {
+ /** {@inheritDoc} */
+ @Override @Nullable public Map<Integer, int[]> stripeMap() {
return stripeMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5844021..299386c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -134,8 +133,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
int keyNum;
int[] stripeIdxs;
- if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
- stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+ if (res.stripe() > -1 && req.stripeMap() != null) {
+ stripeIdxs = req.stripeMap().get(res.stripe());
keyNum = stripeIdxs.length;
}
else {
[02/10] ignite git commit: ignite-4680-2
Posted by sb...@apache.org.
ignite-4680-2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1b4ebd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1b4ebd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1b4ebd4
Branch: refs/heads/ignite-4680-sb
Commit: d1b4ebd48290af5fb3a2d0d9df57219dd4ce1edb
Parents: 8e5e3cb
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 19:41:49 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 19:41:49 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../ignite/internal/IgniteNodeAttributes.java | 3 +
.../managers/communication/GridIoManager.java | 23 +-
.../cache/GridCacheAffinityManager.java | 21 +-
.../processors/cache/GridCacheAtomicFuture.java | 2 +-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 6 +-
.../cache/GridCacheManagerAdapter.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 14 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 184 ++++++++---
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 +-
.../GridNearAtomicAbstractUpdateFuture.java | 102 +++++-
.../GridNearAtomicAbstractUpdateRequest.java | 46 +--
.../atomic/GridNearAtomicFullUpdateRequest.java | 127 ++++++-
.../GridNearAtomicSingleUpdateFuture.java | 25 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 72 ++--
.../atomic/GridNearAtomicUpdateResponse.java | 40 ++-
.../distributed/near/GridNearAtomicCache.java | 27 +-
.../cache/version/GridCacheVersionManager.java | 47 +++
.../apache/ignite/internal/util/MPSCQueue.java | 331 +++++++++++++++++++
.../ignite/internal/util/StripedExecutor.java | 125 ++++++-
.../ignite/thread/IgniteStripeThread.java | 47 +++
.../GridNearAtomicFullUpdateRequestTest.java | 74 +++++
.../dht/atomic/MarshallingAbstractTest.java | 156 +++++++++
.../version/GridCacheVersionManagerTest.java | 86 +++++
26 files changed, 1380 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..d37e4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -227,6 +227,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
@@ -1167,7 +1168,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
" ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
- " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
+ " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]" + NL +
+ " ^-- Striped executor [" + ctx.getStripedExecutorService().getMetrics() + "]";
log.info(msg);
}
@@ -1440,6 +1442,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
+ add(ATTR_STRIPES_CNT, ctx.getStripedExecutorService() != null ? ctx.getStripedExecutorService().stripes() : -1);
+
if (cfg.getConsistentId() != null)
add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 8294026..f1a2558 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -168,6 +168,9 @@ public final class IgniteNodeAttributes {
/** Ignite services compatibility mode (can be {@code null}). */
public static final String ATTR_SERVICES_COMPATIBILITY_MODE = ATTR_PREFIX + ".services.compatibility.enabled";
+ /** Ignite cache stripe size. */
+ public static final String ATTR_STRIPES_CNT = ATTR_PREFIX + ".cache.stripes.cnt";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..5fd2845 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -97,9 +99,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -807,19 +809,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
};
+ StripedExecutor stripedExecutor = ctx.getStripedExecutorService();
+
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
if (msg0.processFromNioThread())
c.run();
else
- ctx.getStripedExecutorService().execute(-1, c);
+ stripedExecutor.execute(-1, c);
return;
}
- if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
- ctx.getStripedExecutorService().execute(msg.partition(), c);
+ if (plc == GridIoPolicy.SYSTEM_POOL &&
+ (msg.partition() != Integer.MIN_VALUE ||
+ msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
+ Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+ ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+
+ if (stripemap != null) {
+ for (Integer stripe : stripemap.keySet()) {
+ stripedExecutor.execute(stripe, c);
+ }
+ }
+ else
+ stripedExecutor.execute(msg.partition(), c);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 17c9319..621634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
@@ -33,12 +38,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
/**
* Cache affinity manager.
*/
@@ -88,10 +87,15 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
*
*/
public void cancelFutures() {
+ if (!starting.get())
+ // Ignoring attempt to stop manager that has never been started.
+ return;
+
IgniteCheckedException err =
new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
- aff.cancelFutures(err);
+ if (aff != null)
+ aff.cancelFutures(err);
}
/** {@inheritDoc} */
@@ -99,7 +103,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Failed to wait for topology update, client disconnected.");
- aff.cancelFutures(err);
+ if (aff != null)
+ aff.cancelFutures(err);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
* @return Future ID.
*/
- public Long id();
+ public long id();
/**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 06c707e..e76b773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1966,7 +1966,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (lock.readLock().tryLock()) {
try {
if (log.isDebugEnabled())
- log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId +
+ log.debug("Entered to eviction future storeResponse() [fut=" + this + ", node=" + nodeId +
", res=" + res + ']');
ClusterNode node = cctx.node(nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 99878ec..3cf68d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -431,7 +431,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
append(", dhtTxId=").append(dhtTxId(cacheMsg)).
append(", msg=").append(cacheMsg);
}
- else if (atomicFututeId(cacheMsg) != null) {
+ else if (atomicFututeId(cacheMsg) > -1) {
builder.append("futId=").append(atomicFututeId(cacheMsg)).
append(", writeVer=").append(atomicWriteVersion(cacheMsg)).
append(", msg=").append(cacheMsg);
@@ -484,7 +484,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
- @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
+ @Nullable private long atomicFututeId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
@@ -496,7 +496,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
- return null;
+ return -1;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 8ad0ea8..ab965de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -34,7 +34,7 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
protected IgniteLogger log;
/** Starting flag. */
- private final AtomicBoolean starting = new AtomicBoolean(false);
+ protected final AtomicBoolean starting = new AtomicBoolean(false);
/** {@inheritDoc} */
@Override public final void start(GridCacheContext<K, V> cctx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..c4cbefd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -132,7 +132,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
};
/** Logger. */
- @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
/** */
@@ -256,9 +256,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
if (cacheFut.isCancelled() || cacheFut.isDone()) {
- Long futId = cacheFut.id();
+ long futId = cacheFut.id();
- if (futId != null)
+ if (futId > -1)
atomicFuts.remove(futId, cacheFut);
}
}
@@ -437,7 +437,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
- public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+ public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +472,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
return atomicFuts.get(futId);
}
@@ -480,7 +480,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Removed future.
*/
- @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
return atomicFuts.remove(futId);
}
@@ -511,8 +511,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
-
- /**
* Adds future.
*
* @param fut Future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..17ee298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/** {@inheritDoc} */
- @Override public final Long id() {
+ public final long id() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..4ab5000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,6 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteStripeThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -278,9 +279,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req
) {
+ int stripeIdx;
+
+ if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
+ stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+ else
+ stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+
processNearAtomicUpdateRequest(
nodeId,
- req);
+ req,
+ stripeIdx);
}
@Override public String toString() {
@@ -1640,6 +1649,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Executes local update after preloader fetched values.
+ *
+ * @param nodeId Node ID.
+ * @param req Update request.
+ * @param completionCb Completion callback.
+ */
+ public void updateAllAsyncInternal(
+ final UUID nodeId,
+ final GridNearAtomicAbstractUpdateRequest req,
+ final UpdateReplyClosure completionCb
+ ) {
+ updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+ }
+
+ /**
* Executes local update.
*
* @param nodeId Node ID.
@@ -1649,6 +1673,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
+ final int stripeIdx,
final UpdateReplyClosure completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1667,7 +1692,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, completionCb);
+ updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
}
else {
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1684,7 +1709,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, completionCb);
+ updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
}
});
}
@@ -1718,11 +1743,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param nodeId Node ID.
* @param req Update request.
+ * @param stripeIdx Stripe index.
* @param completionCb Completion callback.
*/
private void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req,
+ int stripeIdx,
UpdateReplyClosure completionCb
) {
ClusterNode node = ctx.discovery().node(nodeId);
@@ -1741,6 +1768,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
ctx.deploymentEnabled());
+ res.partition(req.partition());
+
+ int[] stripeIdxs = null;
+
+ if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+ && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+ && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
+ stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+
+ res.stripe(stripeIdx);
+ }
+
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1765,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (top.stopping()) {
- res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+ addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
"(cache is stopped): " + name()));
completionCb.apply(req, res);
@@ -1776,7 +1815,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
- locked = lockEntries(req, req.topologyVersion());
+ locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
@@ -1795,13 +1834,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req);
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ dhtFut = createDhtFuture(ver, req, size);
expiry = expiryPolicy(req.expiry());
GridCacheReturn retVal = null;
- if (req.size() > 1 && // Several keys ...
+ if (size > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
// (conflict resolver should be used for local store)
@@ -1818,7 +1859,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
@@ -1837,7 +1879,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1895,7 +1938,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// an attempt to use cleaned resources.
U.error(log, "Unexpected exception during cache update", e);
- res.addFailedKeys(req.keys(), e);
+ addAllKeysAsFailed(req, res, stripeIdxs, e);
completionCb.apply(req, res);
@@ -1921,6 +1964,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Adds all keys as failed to response.
+ *
+ * @param req Request.
+ * @param res Response.
+ * @param stripeIdx Stripe indexes.
+ * @param e Throwable.
+ */
+ private void addAllKeysAsFailed(GridNearAtomicAbstractUpdateRequest req,
+ GridNearAtomicUpdateResponse res,
+ int[] stripeIdx,
+ Throwable e) {
+
+ if (stripeIdx == null)
+ res.addFailedKeys(req.keys(), e);
+ else {
+ for (int i = 0; i < stripeIdx.length; i++)
+ res.addFailedKey(req.key(stripeIdx[i]), e);
+ }
+ }
+
+ /**
* Updates locked entries using batched write-through.
*
* @param node Sender node.
@@ -1934,6 +1998,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
+ * @param stripeIdxs Stripe indexes.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1949,7 +2014,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
- final boolean sndPrevVal
+ final boolean sndPrevVal,
+ final int[] stripeIdxs
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1959,13 +2025,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
reloadIfNeeded(locked);
}
catch (IgniteCheckedException e) {
- res.addFailedKeys(req.keys(), e);
+ addAllKeysAsFailed(req, res, stripeIdxs, e);
return new UpdateBatchResult();
}
}
- int size = req.size();
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
Map<KeyCacheObject, CacheObject> putMap = null;
@@ -1990,6 +2056,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
try {
if (!checkFilter(entry, req, res)) {
if (expiry != null && entry.hasValue()) {
@@ -2017,7 +2085,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (op == TRANSFORM) {
- EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(trueIdx);
CacheObject old = entry.innerGet(
ver,
@@ -2099,7 +2167,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
firstEntryIdx = i;
@@ -2147,7 +2216,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
firstEntryIdx = i;
@@ -2172,12 +2242,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessorMap.put(entry.key(), entryProcessor);
}
else if (op == UPDATE) {
- CacheObject updated = req.value(i);
+ CacheObject updated = req.value(trueIdx);
if (intercept) {
CacheObject old = entry.innerGet(
- null,
- null,
+ null,
+ null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
/*metrics*/true,
@@ -2273,7 +2343,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
}
else
assert filtered.isEmpty();
@@ -2350,6 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
+ * @param stripeIdxs Stripe indexes.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -2364,7 +2436,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean sndPrevVal
+ boolean sndPrevVal,
+ int[] stripeIdxs
) throws GridCacheEntryRemovedException {
GridCacheReturn retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -2377,9 +2450,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+ int keyNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
// Avoid iterator creation.
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject k = req.key(i);
+ for (int i = 0; i < keyNum; i++) {
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ KeyCacheObject k = req.key(trueIdx);
GridCacheOperation op = req.operation();
@@ -2388,13 +2465,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = locked.get(i);
- GridCacheVersion newConflictVer = req.conflictVersion(i);
- long newConflictTtl = req.conflictTtl(i);
- long newConflictExpireTime = req.conflictExpireTime(i);
+ GridCacheVersion newConflictVer = req.conflictVersion(trueIdx);
+ long newConflictTtl = req.conflictTtl(trueIdx);
+ long newConflictExpireTime = req.conflictExpireTime(trueIdx);
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
+ Object writeVal = op == TRANSFORM ? req.entryProcessor(trueIdx) : req.writeValue(trueIdx);
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
@@ -2478,13 +2555,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
- res.addNearValue(i,
+ res.addNearValue(trueIdx,
updRes.newValue(),
updRes.newTtl(),
updRes.conflictExpireTime());
}
else
- res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
+ res.addNearTtl(trueIdx, updRes.newTtl(), updRes.conflictExpireTime());
if (updRes.newValue() != null) {
IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2495,10 +2572,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
entry.removeReader(nearNode.id(), req.messageId());
else
- res.addSkippedIndex(i);
+ res.addSkippedIndex(trueIdx);
}
else
- res.addSkippedIndex(i);
+ res.addSkippedIndex(trueIdx);
}
if (updRes.removeVersion() != null) {
@@ -2548,7 +2625,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param hasNear {@code True} if originating node has near cache.
- * @param firstEntryIdx Index of the first entry in the request keys collection.
+ * @param firstEntryIdx Index of the first entry in the request keys collection or in the stripeIdxs.
* @param entries Entries to update.
* @param ver Version to set.
* @param nearNode Originating node.
@@ -2584,7 +2661,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final UpdateBatchResult batchRes,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
- final boolean sndPrevVal
+ final boolean sndPrevVal,
+ final int[] stripeIdxs
) {
assert putMap == null ^ rmvKeys == null;
@@ -2737,17 +2815,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
+ // it's index inside all keys
+ int trueIdx = stripeIdxs == null ? firstEntryIdx + i : stripeIdxs[firstEntryIdx + i];
+
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
int idx = firstEntryIdx + i;
if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
+ res.addNearValue(trueIdx,
writeVal,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
}
else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+ res.addNearTtl(trueIdx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
if (writeVal != null || entry.hasValue()) {
IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2758,7 +2839,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else if (readers.contains(nearNode.id())) // Reader became primary or backup.
entry.removeReader(nearNode.id(), req.messageId());
else
- res.addSkippedIndex(firstEntryIdx + i);
+ res.addSkippedIndex(trueIdx);
}
}
catch (GridCacheEntryRemovedException e) {
@@ -2789,12 +2870,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param req Request with keys to lock.
* @param topVer Topology version to lock on.
+ * @param stripeIdxs Stripe indexes.
* @return Collection of locked entries.
* @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown,
* locks are released.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
+ private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req,
+ AffinityTopologyVersion topVer,
+ int[] stripeIdxs)
throws GridDhtInvalidPartitionException {
if (req.size() == 1) {
KeyCacheObject key = req.key(0);
@@ -2811,11 +2895,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
- List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+ int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+ List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
while (true) {
- for (int i = 0; i < req.size(); i++) {
- GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+ for (int i = 0; i < keysNum; i++) {
+ int idx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ GridDhtCacheEntry entry = entryExx(req.key(idx), topVer);
locked.add(entry);
}
@@ -3003,25 +3090,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ int size
) {
- if (updateReq.size() == 1)
+ if (size == 1)
return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
else
- return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, size);
}
/**
* @param nodeId Sender node ID.
+ * @param stripeIdx Stripe number.
* @param req Near atomic update request.
*/
- private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req, int stripeIdx) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
- ", node=" + nodeId + ']');
+ ", node=" + nodeId + ", stripe=" + stripeIdx + ']');
}
- updateAllAsyncInternal(nodeId, req, updateReplyClos);
+ updateAllAsyncInternal(nodeId, req, stripeIdx, updateReplyClos);
}
/**
@@ -3031,7 +3120,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response " +
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() + ']');
res.nodeId(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..af9fd8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,11 +49,12 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ int size
) {
super(cctx, writeVer, updateReq);
- mappings = U.newHashMap(updateReq.size());
+ mappings = U.newHashMap(size);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..770999a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -138,7 +139,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Future ID. */
@GridToStringInclude
- protected Long futId;
+ protected long futId = -1;
/** Operation result. */
protected GridCacheReturn opRes;
@@ -356,7 +357,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return Response to notify about primary failure.
*/
final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
- assert req.response() == null : req;
assert req.nodeId() != null : req;
if (msgLog.isDebugEnabled()) {
@@ -423,6 +423,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
final GridNearAtomicAbstractUpdateRequest req;
/** */
+ private GridNearAtomicUpdateResponse res;
+
+ /** */
+ private Map<Integer, GridNearAtomicUpdateResponse> resMap;
+
+ /** */
@GridToStringInclude
Set<UUID> dhtNodes;
@@ -431,7 +437,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
private Set<UUID> rcvd;
/** */
- private boolean hasRes;
+ private boolean hasDhtRes;
/**
* @param req Request.
@@ -516,13 +522,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
+ * @return {@code True} if all near results gathered.
+ */
+ private boolean hasAllNearResults() {
+ return res != null || (resMap != null && resMap.size() == req.stripes());
+ }
+
+ public String state() {
+ return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
+ }
+
+ /**
* @return {@code True} if all expected responses are received.
*/
private boolean finished() {
if (req.writeSynchronizationMode() == PRIMARY_SYNC)
- return hasRes;
+ return hasAllNearResults();
- return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+ return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
}
/**
@@ -536,13 +553,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* When primary failed, even if primary response is received, it is possible it failed to send
* request to backup(s), need remap operation.
*/
- if (req.fullSync() && !req.nodeFailedResponse()) {
- req.resetResponse();
+ if (req.fullSync() && !nodeFailedResponse()) {
+ resetResponse();
return req;
}
- return req.response() == null ? req : null;
+ return hasAllNearResults() ? req : null;
}
/**
@@ -559,7 +576,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (finished())
return null;
- return req.response() == null ? req : null;
+ return !hasAllNearResults() ? req : null;
}
/**
@@ -571,7 +588,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return DhtLeftResult.NOT_DONE;
if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
- if (hasRes)
+ if (hasDhtRes)
return DhtLeftResult.DONE;
else
return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -592,7 +609,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return false;
if (res.hasResult())
- hasRes = true;
+ hasDhtRes = true;
if (dhtNodes == null) {
if (rcvd == null)
@@ -614,9 +631,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
assert !finished() : this;
- hasRes = true;
-
- boolean onRes = req.onResponse(res);
+ boolean onRes = storeResponse(res);
assert onRes;
@@ -639,7 +654,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param cctx Context.
*/
private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
- assert dhtNodes == null || req.initMappingLocally();
+ assert F.isEmpty(dhtNodes) || req.initMappingLocally();
Set<UUID> dhtNodes0 = dhtNodes;
@@ -664,12 +679,67 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
dhtNodes = Collections.emptySet();
}
+ /**
+ * @return {@code True} if received notification about primary fail.
+ */
+ boolean nodeFailedResponse() {
+ return res != null && res.nodeLeftResponse();
+ }
+
+ /**
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
+ */
+ private boolean storeResponse(GridNearAtomicUpdateResponse res) {
+ if (res.stripe() > -1) {
+ if (resMap == null)
+ resMap = U.newHashMap(req.stripes());
+
+ if (!resMap.containsKey(res.stripe())) {
+ resMap.put(res.stripe(), res);
+ return true;
+ }
+ }
+ else {
+ if (this.res == null) {
+ this.res = res;
+// this.resMap = null;
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ void resetResponse() {
+ res = null;
+ resMap = null;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
+ return resMap;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PrimaryRequestState.class, this,
"primary", primaryId(),
"needPrimaryRes", req.needPrimaryResponse(),
- "primaryRes", req.response() != null,
+ "primaryRes", this.res != null,
"done", finished());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..b549370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,10 +89,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridToStringExclude
protected byte flags;
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
/**
*
*/
@@ -249,41 +245,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /**
- *
- */
- void resetResponse() {
- this.res = null;
- }
-
- /**
- * @return Response.
- */
- @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /**
- * @return {@code True} if received notification about primary fail.
- */
- boolean nodeFailedResponse() {
- return res != null && res.nodeLeftResponse();
- }
-
- /**
* @return Topology locked flag.
*/
final boolean topologyLocked() {
@@ -451,6 +412,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
public abstract int size();
/**
+ * @return Number of stripes.
+ */
+ public int stripes() {
+ return 0;
+ }
+
+ /**
* @param idx Key index.
* @return Key.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..5a8c66b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,13 +21,16 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -40,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,6 +63,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
+ public static final int DIRECT_TYPE = 40;
+
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -70,6 +74,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ /** Stripe to index mapping. */
+ @GridDirectTransient
+ private int[] stripeCnt;
+
+ /** Stripe to index mapping bytes. */
+ @GridDirectMap(keyType = int.class, valueType = int[].class)
+ private Map<Integer, int[]> stripeMap;
+
/** Entry processors. */
@GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -109,6 +121,17 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectTransient
private int initSize;
+ /** Maximum number of keys. */
+ @GridDirectTransient
+ private int maxEntryCnt;
+
+ /** Number of stripes on remote node. */
+ @GridDirectTransient
+ private int maxStripes;
+
+ /** Partition Id */
+ private int partId;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -155,7 +178,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
boolean skipStore,
boolean keepBinary,
boolean addDepInfo,
- int maxEntryCnt
+ int maxEntryCnt,
+ int maxStripes
) {
super(cacheId,
nodeId,
@@ -181,6 +205,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
// than 10, we use it.
initSize = Math.min(maxEntryCnt, 10);
+ this.maxEntryCnt = maxEntryCnt;
+ this.maxStripes = maxStripes;
+
keys = new ArrayList<>(initSize);
}
@@ -200,6 +227,25 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
assert val != null || op == DELETE;
+ if (maxStripes > 1) {
+ int stripe = key.partition() % maxStripes;
+
+ if (stripeCnt == null)
+ stripeCnt = new int[maxStripes];
+
+ if (stripeMap == null)
+ stripeMap = new HashMap<>(maxStripes);
+
+ int[] idxs = stripeMap.get(stripe);
+
+ if (idxs == null)
+ stripeMap.put(stripe, idxs = new int[maxEntryCnt]);
+
+ int idx = stripeCnt[stripe];
+ idxs[idx] = keys.size();
+ stripeCnt[stripe] = idx + 1;
+ }
+
keys.add(key);
if (entryProcessor != null) {
@@ -267,6 +313,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public int stripes() {
+ return stripeMap.size();
+ }
+
+ /** {@inheritDoc} */
@Override public KeyCacheObject key(int idx) {
return keys.get(idx);
}
@@ -353,6 +404,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return expiryPlc;
}
+ /**
+ * @return Stripe mapping.
+ */
+ @Nullable public Map<Integer, int[]> stripeMap() {
+ return stripeMap;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -362,6 +420,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
if (expiryPlc != null && expiryPlcBytes == null)
expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ if (stripeMap != null && stripeCnt != null) {
+ for (Integer idx : stripeMap.keySet()) {
+ stripeMap.put(idx, Arrays.copyOf(stripeMap.get(idx), stripeCnt[idx]));
+ }
+
+ stripeCnt = null;
+ }
+
prepareMarshalCacheObjects(keys, cctx);
if (filter != null) {
@@ -425,9 +491,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int partition() {
- assert !F.isEmpty(keys);
+ return partId;
+ }
- return keys.get(0).partition();
+ /**
+ * @param partId Partition.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
}
/** {@inheritDoc} */
@@ -494,6 +565,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
writer.incrementState();
case 18:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -580,6 +663,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 18:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -594,24 +693,24 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public void cleanup(boolean clearKeys) {
- vals = null;
- entryProcessors = null;
- entryProcessorsBytes = null;
- invokeArgs = null;
- invokeArgsBytes = null;
-
- if (clearKeys)
- keys = null;
+// vals = null;
+// entryProcessors = null;
+// entryProcessorsBytes = null;
+// invokeArgs = null;
+// invokeArgsBytes = null;
+//
+// if (clearKeys)
+// keys = null;
}
/** {@inheritDoc} */
@Override public byte directType() {
- return 40;
+ return DIRECT_TYPE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..2b6d2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
CachePartialUpdateCheckedException err0 = null;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return Non-null topology version if update should be remapped.
*/
private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert futId != -1;
AffinityTopologyVersion remapTopVer0 = null;
@@ -362,7 +362,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.mvcc().removeAtomicFuture(futId);
reqState = null;
- futId = null;
+ futId = -1;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -488,7 +488,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
reqState0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == -1 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -530,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @param futId
* @return
*/
- private boolean checkDhtNodes(Long futId) {
+ private boolean checkDhtNodes(long futId) {
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
AffinityTopologyVersion remapTopVer0 = null;
@@ -538,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicCheckUpdateRequest checkReq = null;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == -1 || this.futId != futId)
return false;
assert reqState != null;
@@ -568,13 +568,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/**
* @return Future ID.
*/
- private Long onFutureDone() {
+ private long onFutureDone() {
Long id0;
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = -1;
}
return id0;
@@ -694,6 +694,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
skipStore,
keepBinary,
cctx.deploymentEnabled(),
+ 1,
1);
}
@@ -721,9 +722,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
if (nearEnabled) {
- assert reqState.req.response() != null;
+ assert reqState.response() != null;
- updateNear(reqState.req, reqState.req.response());
+ updateNear(reqState.req, reqState.response());
}
onDone(opRes, err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..9c4de66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -57,6 +57,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
@@ -150,7 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -167,7 +168,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
synchronized (mux) {
- if (futId == null)
+ if (futId == -1)
return false;
if (singleReq != null) {
@@ -282,10 +283,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- Long futId = onFutureDone();
+ long futVer = onFutureDone();
- if (futId != null)
- cctx.mvcc().removeAtomicFuture(futId);
+ if (futVer > -1)
+ cctx.mvcc().removeAtomicFuture(futVer);
return true;
}
@@ -300,7 +301,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
PrimaryRequestState reqState;
@@ -370,10 +371,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- boolean rcvAll;
+ boolean rcvAll = false;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
if (singleReq != null) {
@@ -408,6 +409,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
rcvAll = false;
}
+ if (msgLog.isDebugEnabled())
+ msgLog.debug("Processed near atomic update response " +
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
+ ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
+ " " + reqState.state() + ']');
}
else
return;
@@ -466,11 +472,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
for (PrimaryRequestState reqState : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = reqState.req.response();
+ if (reqState.responses() != null) {
+ for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
+ updateNear(reqState.req, res0);
+ }
+ }
+ else {
+ GridNearAtomicUpdateResponse res0 = reqState.response();
- assert res0 != null : reqState;
+ assert res0 != null : reqState;
- updateNear(reqState.req, res0);
+ updateNear(reqState.req, res0);
+ }
}
}
else if (!nodeErr)
@@ -534,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Non null topology version if update should be remapped.
*/
@Nullable private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert futId != -1;
AffinityTopologyVersion remapTopVer0 = null;
@@ -577,7 +590,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (remapTopVer0 != null) {
cctx.mvcc().removeAtomicFuture(futId);
- futId = null;
+ futId = -1;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -596,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (nearEnabled) {
if (mappings != null) {
for (PrimaryRequestState reqState : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = reqState.req.response();
+ GridNearAtomicUpdateResponse res0 = reqState.response();
assert res0 != null : reqState;
@@ -604,9 +617,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
else {
- assert singleReq != null && singleReq.req.response() != null;
+ assert singleReq != null && singleReq.response() != null;
- updateNear(singleReq.req, singleReq.req.response());
+ updateNear(singleReq.req, singleReq.response());
}
}
@@ -767,7 +780,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- Long futId = cctx.mvcc().atomicFutureId();
+ long futId = cctx.mvcc().atomicFutureId();
Exception err = null;
PrimaryRequestState singleReq0 = null;
@@ -801,7 +814,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == -1 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -866,7 +879,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == -1 || this.futId != futId)
return;
if (singleReq != null) {
@@ -934,13 +947,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @return Future version.
*/
- private Long onFutureDone() {
- Long id0;
+ private long onFutureDone() {
+ long id0;
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = -1;
}
return id0;
@@ -957,7 +970,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@SuppressWarnings("ForLoopReplaceableByForEach")
private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
- Long futId,
+ long futId,
@Nullable Collection<KeyCacheObject> remapKeys,
boolean mappingKnown) throws Exception {
Iterator<?> it = null;
@@ -977,6 +990,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
+ int part = (int)(futId & 0xFFFF);
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null)
@@ -1045,6 +1060,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
PrimaryRequestState mapped = pendingMappings.get(nodeId);
if (mapped == null) {
+ int stripes = primary.attribute(ATTR_STRIPES_CNT) != null ? (int)primary.attribute(ATTR_STRIPES_CNT) : -1;
+
GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
nodeId,
@@ -1063,7 +1080,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
skipStore,
keepBinary,
cctx.deploymentEnabled(),
- keys.size());
+ keys.size(),
+ stripes
+ );
+
+ req.partition(part);
mapped = new PrimaryRequestState(req, nodes, false);
@@ -1086,7 +1107,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Request.
* @throws Exception If failed.
*/
- private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId, boolean mappingKnown)
throws Exception {
Object key = F.first(keys);
@@ -1168,6 +1189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
skipStore,
keepBinary,
cctx.deploymentEnabled(),
+ 1,
1);
req.addUpdateEntry(cacheKey,
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 4e20fc7..f8b45d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -96,6 +96,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Partition ID. */
private int partId = -1;
+ /** Stripe. */
+ private int stripe = -1;
+
/** */
@GridDirectCollection(UUID.class)
@GridToStringInclude
@@ -182,6 +185,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @param partId Partition ID for proper striping on near node.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
+ /**
* Sets update error.
*
* @param err Error.
@@ -427,6 +437,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
return partId;
}
+ /**
+ * @return Stripe number.
+ */
+ public int stripe() {
+ return stripe;
+ }
+
+ /**
+ * @param stripe Stripe number.
+ */
+ public void stripe(int stripe) {
+ this.stripe = stripe;
+ }
+
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
@@ -524,6 +548,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
+ case 15:
+ if (!writer.writeInt("stripe", stripe))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -636,6 +666,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
+ case 15:
+ stripe = reader.readInt("stripe");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -648,7 +686,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..5844021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -130,7 +131,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
- if (F.size(res.failedKeys()) == req.size())
+ int keyNum;
+ int[] stripeIdxs;
+
+ if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
+ stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+ keyNum = stripeIdxs.length;
+ }
+ else {
+ stripeIdxs = null;
+ keyNum = req.keys().size();
+ }
+
+ if (F.size(res.failedKeys()) == keyNum)
return;
/*
@@ -150,11 +163,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- for (int i = 0; i < req.size(); i++) {
- if (F.contains(skipped, i))
+ for (int i = 0; i < keyNum; i++) {
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ if (F.contains(skipped, trueIdx))
continue;
- KeyCacheObject key = req.key(i);
+ KeyCacheObject key = req.key(trueIdx);
if (F.contains(failed, key))
continue;
@@ -170,7 +185,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
CacheObject val = null;
- if (F.contains(nearValsIdxs, i)) {
+ if (F.contains(nearValsIdxs, trueIdx)) {
val = res.nearValue(nearValIdx);
nearValIdx++;
@@ -179,7 +194,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
assert req.operation() != TRANSFORM;
if (req.operation() != DELETE)
- val = req.value(i);
+ val = req.value(trueIdx);
}
long ttl = res.nearTtl(i);
[07/10] ignite git commit: tmp
Posted by sb...@apache.org.
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f518395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f518395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f518395
Branch: refs/heads/ignite-4680-sb
Commit: 5f51839525a839c1eec9c28aa7772cc9f1bc59c1
Parents: 3e7ee08
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 15:54:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 15:54:56 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 33 ++++++++++++++++++++
.../dht/atomic/GridDhtAtomicCache.java | 15 ++++++---
.../atomic/GridNearAtomicFullUpdateRequest.java | 1 +
.../apache/ignite/internal/util/MPSCQueue.java | 8 ++---
4 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 39c514b..6dad30b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,11 +55,13 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.MPSCQueue;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -201,6 +203,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
};
+ private Thread resThread;
+
+ private MPSCQueue<Runnable> q;
+
/**
* @param ctx Grid kernal context.
*/
@@ -221,6 +227,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
synchronized (sysLsnrsMux) {
sysLsnrs = new GridMessageListener[GridTopic.values().length];
}
+
+ resThread = new Thread() {
+ public void run() {
+ while (true) {
+ try {
+ Runnable r = q.take();
+
+ r.run();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ q = new MPSCQueue<>(resThread);
+
+ resThread.setDaemon(true);
+ resThread.start();
}
/**
@@ -823,6 +849,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
+// if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+// q.add(c);
+//
+// return;
+// }
+
+
if (plc == GridIoPolicy.SYSTEM_POOL &&
(msg.partition() != Integer.MIN_VALUE ||
msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 973256f..dcc79d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1761,6 +1761,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
+ private GridCacheVersion ver;
+
+
/**
* Executes local update after preloader fetched values.
*
@@ -1837,13 +1840,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
// Assign next version for update inside entries lock.
- GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+ if (ver == null)
+ ver = ctx.versions().next(top.topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1859,7 +1863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
- dhtFut = createDhtFuture(ver, req, size);
+ dhtFut = null;//createDhtFuture(ver, req, size);
expiry = expiryPolicy(req.expiry());
@@ -1977,9 +1981,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- else
+ else {
if (dhtFut != null)
dhtFut.map(node, res.returnValue(), res, completionCb);
+ else
+ completionCb.apply(req, res);
+ }
if (req.writeSynchronizationMode() != FULL_ASYNC)
req.cleanup(!node.isLocal());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 2e619ee..ce6035e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -406,6 +406,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override @Nullable public Map<Integer, int[]> stripeMap() {
+ //stripeMap = null;
return stripeMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
index 5505b3a..5725390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -41,7 +41,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
/** */
final AtomicReference<Node> putStack = new AtomicReference<>();
/** */
- private final AtomicInteger takeStackSize = new AtomicInteger();
+ //private final AtomicInteger takeStackSize = new AtomicInteger();
/** */
private Thread consumerThread;
@@ -189,7 +189,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
private void dequeue() {
takeStack[takeStackIndex] = null;
takeStackIndex++;
- takeStackSize.lazySet(takeStackSize.get() - 1);
+ //takeStackSize.lazySet(takeStackSize.get() - 1);
}
/** */
@@ -248,7 +248,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
private void copyIntoTakeStack(Node putStackHead) {
int putStackSize = putStackHead.size;
- takeStackSize.lazySet(putStackSize);
+ //takeStackSize.lazySet(putStackSize);
if (putStackSize > takeStack.length)
takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
@@ -270,7 +270,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
@Override public int size() {
Node h = putStack.get();
int putStackSize = h == null ? 0 : h.size;
- return putStackSize + takeStackSize.get();
+ return putStackSize + 0;//takeStackSize.get();
}
/** {@inheritDoc}. */
[10/10] ignite git commit: tmp
Posted by sb...@apache.org.
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e59edc93
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e59edc93
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e59edc93
Branch: refs/heads/ignite-4680-sb
Commit: e59edc93098eb63457953cf92b5ec4bad789c0ca
Parents: ae9737b
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 18:00:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 18:00:24 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 12 ++------
.../atomic/GridNearAtomicFullUpdateRequest.java | 32 +++-----------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 ---
3 files changed, 6 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bcfea79..f2a251d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -212,9 +212,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new UpdateReplyClosure() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (req.writeSynchronizationMode() != FULL_ASYNC) {
+ if (req.writeSynchronizationMode() != FULL_ASYNC)
sendNearUpdateReply(res.nodeId(), res);
- }
else {
if (res.remapTopologyVersion() != null)
// Remap keys on primary node in FULL_ASYNC mode.
@@ -280,17 +279,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req
) {
- int stripeIdx;
- Thread curTrd = Thread.currentThread();
- if (req instanceof GridNearAtomicFullUpdateRequest && curTrd instanceof IgniteThread)
- stripeIdx = ((IgniteThread)curTrd).stripe();
- else
- stripeIdx = IgniteThread.GRP_IDX_UNASSIGNED;
-
processNearAtomicUpdateRequest(
nodeId,
req,
- stripeIdx);
+ -1);
}
@Override public String toString() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index ce6035e..2b461ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -129,9 +129,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectTransient
private int maxStripes;
- /** Partition Id */
- private int partId;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -490,14 +487,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
- }
-
- /**
- * @param partId Partition.
- */
- public void partition(int partId) {
- this.partId = partId;
+ return keys.get(0).partition();
}
/** {@inheritDoc} */
@@ -564,18 +554,12 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
writer.incrementState();
case 18:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- case 19:
if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 20:
+ case 19:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -662,14 +646,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 18:
- partId = reader.readInt("partId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
@@ -677,7 +653,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 20:
+ case 19:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -709,7 +685,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 21;
+ return 20;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8774d5f..2685ef6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -982,8 +982,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
- int part = (int)(futId & 0xFFFF);
-
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null)
@@ -1076,8 +1074,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
stripes
);
- req.partition(part);
-
mapped = new PrimaryRequestState(req, nodes, false);
pendingMappings.put(nodeId, mapped);