You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:46 UTC

[01/10] ignite git commit: ignite-4680-2

Repository: ignite
Updated Branches:
  refs/heads/ignite-4680-sb [created] e59edc930


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 5a8904f..1a87ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -41,6 +41,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     /** Timestamp used as base time for cache topology version (January 1, 2014). */
     public static final long TOP_VER_BASE_TIME = 1388520000000L;
 
+    /** Maximum number of atomic ids for thread. Must be power of two ! */
+    protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
     /**
      * Current order. Initialize to current time to make sure that
      * local version increments even after restarts.
@@ -63,6 +66,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     /** */
     private GridCacheVersion ISOLATED_STREAMER_VER;
 
+    /** Global atomic id counter. */
+    protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+    /** Per thread atomic id counter. */
+    private final ThreadLocal<LongWrapper> threadAtomicVersionCnt = new ThreadLocal<LongWrapper>() {
+        @Override protected LongWrapper initialValue() {
+            return new LongWrapper(globalAtomicCnt);
+        }
+    };
+
     /** */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -304,4 +317,38 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     public GridCacheVersion last() {
         return last;
     }
+
+    /**
+     * @return Next future Id for atomic futures.
+     */
+    public long nextAtomicFutureVersion() {
+        LongWrapper cnt = threadAtomicVersionCnt.get();
+        return cnt.getNext();
+    }
+
+    /** Long wrapper. */
+    private static class LongWrapper {
+        /** */
+        private long val;
+        private final AtomicLong globalCnt;
+
+        /** */
+        public LongWrapper(AtomicLong globalCnt) {
+            assert THREAD_RESERVE_SIZE > 1 && (THREAD_RESERVE_SIZE & (THREAD_RESERVE_SIZE - 1)) == 0 :
+                "THREAD_RESERVE_SIZE must be power of two";
+
+            this.globalCnt = globalCnt;
+            val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+        }
+
+        /** */
+        public long getNext() {
+            long res = val++;
+
+            if ((val & (THREAD_RESERVE_SIZE - 1)) == 0)
+                val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+
+            return res;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
new file mode 100644
index 0000000..5505b3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
+/**
+ * @param <E>
+ */
+public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+
+    /** */
+    static final int INITIAL_ARRAY_SIZE = 512;
+    /** */
+    static final Node BLOCKED = new Node();
+
+    /** */
+    final AtomicReference<Node> putStack = new AtomicReference<>();
+    /** */
+    private final AtomicInteger takeStackSize = new AtomicInteger();
+
+    /** */
+    private Thread consumerThread;
+    /** */
+    private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
+    /** */
+    private int takeStackIndex = -1;
+
+    /**
+     */
+    public MPSCQueue(Thread consumerThread) {
+        assert consumerThread != null;
+        this.consumerThread = consumerThread;
+    }
+
+    /**
+     */
+    public MPSCQueue() {
+    }
+
+    /**
+     * Sets the consumer thread.
+     *
+     * The consumer thread is needed for blocking, so that an offering known which thread
+     * to wakeup. There can only be a single consumerThread and this method should be called
+     * before the queue is safely published. It will not provide a happens before relation on
+     * its own.
+     *
+     * @param consumerThread the consumer thread.
+     * @throws NullPointerException when consumerThread null.
+     */
+    public void setConsumerThread(Thread consumerThread) {
+        assert consumerThread != null;
+        this.consumerThread = consumerThread;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * This call is threadsafe; but it will only remove the items that are on the put-stack.
+     */
+    @Override public void clear() {
+        putStack.set(BLOCKED);
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean offer(E item) {
+        assert item != null : "item can't be null";
+
+        AtomicReference<Node> putStack = this.putStack;
+        Node newHead = new Node();
+        newHead.item = item;
+
+        for (; ; ) {
+            Node oldHead = putStack.get();
+            if (oldHead == null || oldHead == BLOCKED) {
+                newHead.next = null;
+                newHead.size = 1;
+            }
+            else {
+                newHead.next = oldHead;
+                newHead.size = oldHead.size + 1;
+            }
+
+            if (!putStack.compareAndSet(oldHead, newHead))
+                continue;
+
+            if (oldHead == BLOCKED)
+                unpark(consumerThread);
+
+            return true;
+        }
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E peek() {
+        E item = peekNext();
+        if (item != null)
+            return item;
+
+        if (!drainPutStack())
+            return null;
+
+        return peekNext();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E take() throws InterruptedException {
+        E item = next();
+
+        if (item != null)
+            return item;
+
+        takeAll();
+        assert takeStackIndex == 0;
+        assert takeStack[takeStackIndex] != null;
+
+        return next();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E poll() {
+        E item = next();
+
+        if (item != null)
+            return item;
+
+        if (!drainPutStack())
+            return null;
+
+        return next();
+    }
+
+    /** */
+    private E next() {
+        E item = peekNext();
+
+        if (item != null)
+            dequeue();
+
+        return item;
+    }
+
+    /** */
+    private E peekNext() {
+        if (takeStackIndex == -1)
+            return null;
+
+        if (takeStackIndex == takeStack.length) {
+            takeStackIndex = -1;
+            return null;
+        }
+
+        E item = (E)takeStack[takeStackIndex];
+
+        if (item == null) {
+            takeStackIndex = -1;
+            return null;
+        }
+
+        return item;
+    }
+
+    /** */
+    private void dequeue() {
+        takeStack[takeStackIndex] = null;
+        takeStackIndex++;
+        takeStackSize.lazySet(takeStackSize.get() - 1);
+    }
+
+    /** */
+    private void takeAll() throws InterruptedException {
+        long iteration = 0;
+        AtomicReference<Node> putStack = this.putStack;
+
+        for (; ; ) {
+            if (consumerThread != null && consumerThread.isInterrupted()) {
+                putStack.compareAndSet(BLOCKED, null);
+                throw new InterruptedException();
+            }
+
+            Node currentPutStackHead = putStack.get();
+
+            if (currentPutStackHead == null) {
+                // there is nothing to be take, so lets block.
+                if (!putStack.compareAndSet(null, BLOCKED)) {
+                    // we are lucky, something is available
+                    continue;
+                }
+
+                // lets block for real.
+                park();
+            }
+            else if (currentPutStackHead == BLOCKED)
+                park();
+
+            else {
+                if (!putStack.compareAndSet(currentPutStackHead, null))
+                    continue;
+
+                copyIntoTakeStack(currentPutStackHead);
+                break;
+            }
+            iteration++;
+        }
+    }
+
+    /** */
+    private boolean drainPutStack() {
+        for (; ; ) {
+            Node head = putStack.get();
+
+            if (head == null)
+                return false;
+
+            if (putStack.compareAndSet(head, null)) {
+                copyIntoTakeStack(head);
+                return true;
+            }
+        }
+    }
+
+    /** */
+    private void copyIntoTakeStack(Node putStackHead) {
+        int putStackSize = putStackHead.size;
+
+        takeStackSize.lazySet(putStackSize);
+
+        if (putStackSize > takeStack.length)
+            takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
+
+        for (int i = putStackSize - 1; i >= 0; i--) {
+            takeStack[i] = putStackHead.item;
+            putStackHead = putStackHead.next;
+        }
+
+        takeStackIndex = 0;
+        assert takeStack[0] != null;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * Best effort implementation.
+     */
+    @Override public int size() {
+        Node h = putStack.get();
+        int putStackSize = h == null ? 0 : h.size;
+        return putStackSize + takeStackSize.get();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public void put(E e) throws InterruptedException {
+        offer(e);
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+        add(e);
+        return true;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int drainTo(Collection<? super E> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public Iterator<E> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** */
+    private static int nextPowerOfTwo(final int value) {
+        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+    }
+
+    /** */
+    private static final class Node<E> {
+        /** */
+        Node next;
+        /** */
+        E item;
+        /** */
+        int size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6c85b32..af474ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,8 +38,9 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteStripeThread;
 import org.jetbrains.annotations.NotNull;
+import org.jsr166.LongAdder8;
 
 /**
  * Striped executor.
@@ -75,7 +76,7 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeConcurrentQueue(
+                stripes[i] = new StripeMPSCQueue(
                     igniteInstanceName,
                     poolName,
                     i,
@@ -146,6 +147,31 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * @return Metrics.
+     */
+    public String getMetrics() {
+        GridStringBuilder sb = new GridStringBuilder();
+        sb.a("completed");
+        GridStringBuilder sb2 = new GridStringBuilder();
+        sb2.a("park");
+        GridStringBuilder sb3 = new GridStringBuilder();
+        sb3.a("unpark");
+        GridStringBuilder sb4 = new GridStringBuilder();
+        sb4.a("queue");
+
+        for (int i = 0; i < stripes.length; i++) {
+            Stripe stripe = stripes[i];
+
+            sb.a(':').a(stripe.completedCnt);
+            sb2.a(':').a(stripe.parkCntr());
+            sb3.a(':').a(stripe.unparkCntr());
+            sb4.a(':').a(stripe.queueSize());
+        }
+
+        return sb.a(' ').a(sb2).a(' ').a(sb3).a(' ').a(sb4).toString();
+    }
+
+    /**
      * @return Stripes count.
      */
     public int stripes() {
@@ -435,10 +461,9 @@ public class StripedExecutor implements ExecutorService {
          * Starts the stripe.
          */
         void start() {
-            thread = new IgniteThread(igniteInstanceName,
+            thread = new IgniteStripeThread(igniteInstanceName,
                 poolName + "-stripe-" + idx,
                 this,
-                IgniteThread.GRP_IDX_UNASSIGNED,
                 idx);
 
             thread.start();
@@ -524,6 +549,20 @@ public class StripedExecutor implements ExecutorService {
          */
         abstract String queueToString();
 
+        /**
+         * @return Number of park ops.
+         */
+        public long parkCntr() {
+            return 0;
+        }
+
+        /**
+         * @return Number of unpark ops.
+         */
+        public long unparkCntr() {
+            return 0;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(Stripe.class, this);
@@ -540,6 +579,12 @@ public class StripedExecutor implements ExecutorService {
         /** */
         private volatile boolean parked;
 
+        /** */
+        private volatile long parkCntr;
+
+        /** */
+        private final LongAdder8 unparkCntr = new LongAdder8();
+
         /**
          * @param igniteInstanceName Ignite instance name.
          * @param poolName Pool name.
@@ -578,6 +623,7 @@ public class StripedExecutor implements ExecutorService {
                     if (r != null)
                         return r;
 
+                    parkCntr++;
                     LockSupport.park();
 
                     if (Thread.interrupted())
@@ -593,8 +639,10 @@ public class StripedExecutor implements ExecutorService {
         void execute(Runnable cmd) {
             queue.add(cmd);
 
-            if (parked)
+            if (parked) {
+                unparkCntr.increment();
                 LockSupport.unpark(thread);
+            }
         }
 
         /** {@inheritDoc} */
@@ -608,6 +656,16 @@ public class StripedExecutor implements ExecutorService {
         }
 
         /** {@inheritDoc} */
+        @Override public long parkCntr() {
+            return parkCntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long unparkCntr() {
+            return unparkCntr.sum();
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(StripeConcurrentQueue.class, this, super.toString());
         }
@@ -719,4 +777,61 @@ public class StripedExecutor implements ExecutorService {
             return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
         }
     }
+
+    /**
+     * Stripe.
+     */
+    private static class StripeMPSCQueue extends Stripe {
+        /** Queue. */
+        private final MPSCQueue<Runnable> queue = new MPSCQueue<>();
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeMPSCQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        /** {@inheritDoc} */
+        @Override void start() {
+            super.start();
+            queue.setConsumerThread(thread);
+        }
+
+        /** {@inheritDoc} */
+        @Override Runnable take() throws InterruptedException {
+            return queue.take();
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+        }
+
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeMPSCQueue.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
new file mode 100644
index 0000000..75b655f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class for use within {@link IgniteThreadPoolExecutor} class.
+ */
+public class IgniteStripeThread extends IgniteThread {
+
+    /** Group index. */
+    private final int stripeIdx;
+
+    /** {@inheritDoc} */
+    public IgniteStripeThread(String gridName, String threadName, Runnable r, int stripeIdx) {
+        super(gridName, threadName, r);
+        this.stripeIdx = stripeIdx;
+    }
+
+    /**
+     * @return Group index.
+     */
+    public int stripeIndex() {
+        return stripeIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteStripeThread.class, this, "name", getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
new file mode 100644
index 0000000..d6e7537
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Trivial GridNearAtomicFullUpdateRequest tests.
+ */
+public class GridNearAtomicFullUpdateRequestTest extends MarshallingAbstractTest {
+    /**
+     * Message marshalling test.
+     *
+     * @throws IgniteCheckedException If fails.
+     */
+    public void testMarshall() throws IgniteCheckedException {
+        GridCacheVersion updVer = new GridCacheVersion(1, 2, 3, 5);
+
+        int entryNum = 3;
+
+        GridNearAtomicFullUpdateRequest msg = new GridNearAtomicFullUpdateRequest(555,
+            UUID.randomUUID(),
+            555L,
+            new AffinityTopologyVersion(25, 5),
+            false,
+            CacheWriteSynchronizationMode.PRIMARY_SYNC,
+            GridCacheOperation.UPDATE,
+            false,
+            null,
+            null,
+            null,
+            null,
+            555,
+            false,
+            false,
+            true,
+            false,
+            3,
+            entryNum);
+
+        for (int i = 0; i < entryNum; i++)
+            msg.addUpdateEntry(
+                key(i, i),
+                val(i),
+                -1,
+                -1,
+                null
+            );
+
+        GridNearAtomicFullUpdateRequest received = marshalUnmarshal(msg);
+
+        assertEquals(555, received.futureId());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
new file mode 100644
index 0000000..f53a32b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryContext;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public abstract class MarshallingAbstractTest extends TestCase {
+    /** */
+    private static final byte proto = 2;
+
+    /** */
+    private GridCacheSharedContext ctx;
+
+    /** */
+    private GridCacheContext cctx;
+
+    /** */
+    @Override protected void setUp() throws Exception {
+        super.setUp();
+        ctx = mock(GridCacheSharedContext.class);
+        cctx = mock(GridCacheContext.class);
+
+        CacheObjectBinaryContext coctx = mock(CacheObjectBinaryContext.class);
+        GridKernalContext kctx = mock(GridKernalContext.class);
+        IgniteEx ignite = mock(IgniteEx.class);
+        GridCacheProcessor proc = mock(GridCacheProcessor.class);
+        GridCacheSharedContext shared = mock(GridCacheSharedContext.class);
+
+        Marshaller marsh = new BinaryMarshaller();
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+        cfg.setMarshaller(marsh);
+
+        when(ctx.cacheContext(anyInt())).thenReturn(cctx);
+        when(ctx.marshaller()).thenReturn(marsh);
+        when(ctx.gridConfig()).thenReturn(cfg);
+
+        when(cctx.cacheObjectContext()).thenReturn(coctx);
+        when(cctx.gridConfig()).thenReturn(cfg);
+        when(cctx.shared()).thenReturn(shared);
+
+        when(cctx.grid()).thenReturn(ignite);
+        when(kctx.grid()).thenReturn(ignite);
+
+        when(ignite.configuration()).thenReturn(cfg);
+
+        when(ctx.kernalContext()).thenReturn(kctx);
+        when(cctx.kernalContext()).thenReturn(kctx);
+        when(coctx.kernalContext()).thenReturn(kctx);
+
+        when(coctx.binaryEnabled()).thenReturn(true);
+
+        when(kctx.cache()).thenReturn(proc);
+        when(kctx.config()).thenReturn(cfg);
+
+        when(proc.context()).thenReturn(ctx);
+
+        IgniteCacheObjectProcessor binaryProcessor = new CacheObjectBinaryProcessorImpl(kctx);
+        when(kctx.cacheObjects()).thenReturn(binaryProcessor);
+
+        // init marshaller
+        marsh.setContext(new MarshallerContextTestImpl());
+
+        when(shared.marshaller()).thenReturn(marsh);
+
+        BinaryContext bctx =
+            new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), new NullLogger());
+
+        IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", bctx, new IgniteConfiguration());
+    }
+
+    /**
+     * @param m Message.
+     * @return Unmarshalled message.
+     */
+    protected <T extends GridCacheMessage> T marshalUnmarshal(T m) throws IgniteCheckedException {
+        ByteBuffer buf = ByteBuffer.allocate(64 * 1024);
+
+        m.prepareMarshal(ctx);
+        m.writeTo(buf, new DirectMessageWriter(proto));
+
+        System.out.println("Binary size: " + buf.position() + " bytes");
+        buf.flip();
+
+        byte type = buf.get();
+        assertEquals(m.directType(), type);
+
+        MessageFactory msgFactory = new GridIoMessageFactory(null);
+
+        Message mx = msgFactory.create(type);
+        mx.readFrom(buf, new DirectMessageReader(msgFactory, proto));
+        ((GridCacheMessage)mx).finishUnmarshal(ctx, U.gridClassLoader());
+
+        return (T)mx;
+    }
+
+    /** */
+    protected KeyCacheObject key(Object val, int part) {
+        return new KeyCacheObjectImpl(val, null, part);
+    }
+
+    protected CacheObject val(Object val) {
+        return new CacheObjectImpl(val, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
new file mode 100644
index 0000000..f555750
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.TestCase;
+
+/**
+ * Simple tests for {@link GridCacheVersionManager}.
+ */
+public class GridCacheVersionManagerTest extends TestCase {
+    /**
+     * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()}.
+     */
+    public void testNextAtomicIdMonotonicalGrows() {
+        GridCacheVersionManager mgr = new GridCacheVersionManager();
+        int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+
+        for (int i = 0; i < n * 3 + 5; i++) {
+            long l = mgr.nextAtomicFutureVersion();
+            assertEquals(i, l);
+        }
+
+        assertEquals(n * 4, mgr.globalAtomicCnt.get());
+    }
+
+    /**
+     * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()} with multiple threads.
+     *
+     * @throws InterruptedException if fails.
+     */
+    public void testNextAtomicMultiThread() throws InterruptedException {
+        int threadsNum = 3;
+
+        final GridCacheVersionManager mgr = new GridCacheVersionManager();
+        final int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+        final int perThreadNum = n * 2 + 10;
+
+        final int[] vals = new int[n * threadsNum * (perThreadNum / n + 1)];
+
+        ExecutorService executorService = Executors.newFixedThreadPool(threadsNum);
+
+        final CountDownLatch latch = new CountDownLatch(threadsNum);
+
+        for (int i = 0; i < threadsNum; i++) {
+            executorService.submit(new Runnable() {
+                @Override public void run() {
+                    for (int i = 0; i < perThreadNum; i++) {
+                        long l = mgr.nextAtomicFutureVersion();
+                        vals[(int)l]++;
+                    }
+                    latch.countDown();
+                }
+            });
+        }
+
+        latch.await();
+        executorService.shutdown();
+
+        int cnt = 0;
+
+        for (int i = 0; i < vals.length; i++) {
+            if (vals[i] > 0)
+                cnt++;
+        }
+
+        assertEquals(threadsNum * perThreadNum, cnt);
+    }
+}


[09/10] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4680-tmp

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4680-tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae9737b4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae9737b4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae9737b4

Branch: refs/heads/ignite-4680-sb
Commit: ae9737b4293479a976bd11fe7169a2e78f0dd1e0
Parents: 3200c2e 9020d12
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 17:50:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:50:18 2017 +0300

----------------------------------------------------------------------
 .../ClientAbstractMultiNodeSelfTest.java        |   13 +-
 .../ignite/internal/IgniteTransactionsEx.java   |    8 +-
 .../processors/cache/GridCacheAdapter.java      |   98 +-
 .../processors/cache/GridCacheProxyImpl.java    |    6 +-
 .../cache/GridCacheSharedContext.java           |   11 +-
 .../processors/cache/GridCacheUtils.java        |    6 +-
 .../processors/cache/IgniteInternalCache.java   |    5 +-
 .../distributed/GridCacheCommittedTxInfo.java   |  117 -
 .../GridDistributedCacheAdapter.java            |    2 +-
 .../GridDistributedTxRemoteAdapter.java         |   59 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |    4 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   57 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  126 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   28 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   65 +-
 .../dht/colocated/GridDhtColocatedCache.java    |    8 +-
 .../colocated/GridDhtColocatedLockFuture.java   |    7 +-
 .../distributed/near/GridNearLockFuture.java    |    4 +-
 .../distributed/near/GridNearLockRequest.java   |  200 +-
 .../near/GridNearTransactionalCache.java        |    6 +-
 .../near/GridNearTxFinishFuture.java            |    4 +-
 .../cache/distributed/near/GridNearTxLocal.java | 2732 ++++++++++++++++-
 .../near/GridNearTxPrepareFutureAdapter.java    |    5 +-
 .../near/GridNearTxPrepareRequest.java          |    2 +-
 .../distributed/near/GridNearTxRemote.java      |    4 +-
 .../store/GridCacheStoreManagerAdapter.java     |  142 +-
 .../cache/transactions/IgniteInternalTx.java    |   80 +-
 .../transactions/IgniteTransactionsImpl.java    |   12 +-
 .../cache/transactions/IgniteTxAdapter.java     |  165 +-
 .../cache/transactions/IgniteTxEntry.java       |    4 +-
 .../cache/transactions/IgniteTxHandler.java     |   67 +-
 .../IgniteTxImplicitSingleStateImpl.java        |    4 +-
 .../transactions/IgniteTxLocalAdapter.java      | 2801 ++----------------
 .../cache/transactions/IgniteTxLocalEx.java     |  145 +-
 .../cache/transactions/IgniteTxManager.java     |  293 +-
 .../cache/transactions/IgniteTxRemoteEx.java    |   11 +
 .../IgniteTxRemoteStateAdapter.java             |    2 +-
 .../cache/transactions/IgniteTxState.java       |    2 +-
 .../cache/transactions/IgniteTxStateImpl.java   |    4 +-
 .../transactions/TransactionProxyImpl.java      |   13 +-
 .../datastructures/DataStructuresProcessor.java |   32 +-
 .../datastructures/GridCacheAtomicLongImpl.java |   18 +-
 .../GridCacheAtomicReferenceImpl.java           |    6 +-
 .../GridCacheAtomicSequenceImpl.java            |    4 +-
 .../GridCacheAtomicStampedImpl.java             |    6 +-
 .../GridCacheCountDownLatchImpl.java            |    6 +-
 .../datastructures/GridCacheLockImpl.java       |   11 +-
 .../datastructures/GridCacheSemaphoreImpl.java  |   18 +-
 .../GridTransactionalCacheQueueImpl.java        |   10 +-
 .../processors/igfs/IgfsDataManager.java        |   55 +-
 .../processors/igfs/IgfsMetaManager.java        |   73 +-
 .../service/GridServiceProcessor.java           |    6 +-
 .../internal/TestRecordingCommunicationSpi.java |   29 +
 .../processors/cache/GridCacheTestStore.java    |    2 -
 .../cache/IgniteTxConfigCacheSelfTest.java      |    4 +-
 .../IgniteCacheSystemTransactionsSelfTest.java  |    7 +-
 .../IgniteTxCachePrimarySyncTest.java           |    5 +
 ...xOriginatingNodeFailureAbstractSelfTest.java |    6 +-
 ...cOriginatingNodeFailureAbstractSelfTest.java |    4 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |    9 +-
 .../dht/IgniteCacheTxRecoveryRollbackTest.java  |  501 ++++
 .../GridCachePartitionedTxSalvageSelfTest.java  |    7 +-
 .../TxOptimisticDeadlockDetectionTest.java      |   30 +-
 ...lockMessageSystemPoolStarvationSelfTest.java |    6 +-
 .../IgniteCacheRestartTestSuite2.java           |    3 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |    3 +
 .../apache/ignite/util/GridRandomSelfTest.java  |   17 -
 .../HibernateReadWriteAccessStrategy.java       |   12 +-
 .../processors/cache/jta/CacheJtaManager.java   |    3 +-
 .../processors/cache/jta/CacheJtaResource.java  |    8 +-
 .../Plugin/PluginTest.cs                        |   24 +
 .../Impl/Plugin/PluginContext.cs                |    9 +
 .../Apache.Ignite.Core/Plugin/IPluginContext.cs |    8 +
 73 files changed, 4191 insertions(+), 4073 deletions(-)
----------------------------------------------------------------------



[03/10] ignite git commit: ignite-4680-2 wip

Posted by sb...@apache.org.
ignite-4680-2 wip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8fa6dd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8fa6dd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8fa6dd4

Branch: refs/heads/ignite-4680-sb
Commit: d8fa6dd4fae51532a4e04ffbfe6c6fa3584520bf
Parents: d1b4ebd
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 20:44:46 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 20:44:46 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 14 +++---
 .../ignite/internal/util/StripedExecutor.java   |  5 ++-
 .../ignite/thread/IgniteStripeThread.java       | 47 --------------------
 3 files changed, 10 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4ab5000..aeb379a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,7 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
-import org.apache.ignite.thread.IgniteStripeThread;
+import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -280,11 +280,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     GridNearAtomicAbstractUpdateRequest req
                 ) {
                     int stripeIdx;
-
-                    if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
-                        stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+                    Thread curTrd = Thread.currentThread();
+                    if (req instanceof GridNearAtomicFullUpdateRequest && curTrd instanceof IgniteThread)
+                        stripeIdx = ((IgniteThread)curTrd).stripe();
                     else
-                        stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+                        stripeIdx = IgniteThread.GRP_IDX_UNASSIGNED;
 
                     processNearAtomicUpdateRequest(
                         nodeId,
@@ -1660,7 +1660,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicAbstractUpdateRequest req,
         final UpdateReplyClosure completionCb
     ) {
-        updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+        updateAllAsyncInternal(nodeId, req, IgniteThread.GRP_IDX_UNASSIGNED, completionCb);
     }
 
     /**
@@ -1772,7 +1772,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int[] stripeIdxs = null;
 
-        if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
             && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
             && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
             stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index af474ff..28bb584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,7 +38,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteStripeThread;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jsr166.LongAdder8;
 
@@ -461,9 +461,10 @@ public class StripedExecutor implements ExecutorService {
          * Starts the stripe.
          */
         void start() {
-            thread = new IgniteStripeThread(igniteInstanceName,
+            thread = new IgniteThread(igniteInstanceName,
                 poolName + "-stripe-" + idx,
                 this,
+                IgniteThread.GRP_IDX_UNASSIGNED,
                 idx);
 
             thread.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8fa6dd4/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
deleted file mode 100644
index 75b655f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.thread;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Class for use within {@link IgniteThreadPoolExecutor} class.
- */
-public class IgniteStripeThread extends IgniteThread {
-
-    /** Group index. */
-    private final int stripeIdx;
-
-    /** {@inheritDoc} */
-    public IgniteStripeThread(String gridName, String threadName, Runnable r, int stripeIdx) {
-        super(gridName, threadName, r);
-        this.stripeIdx = stripeIdx;
-    }
-
-    /**
-     * @return Group index.
-     */
-    public int stripeIndex() {
-        return stripeIdx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteStripeThread.class, this, "name", getName());
-    }
-}


[05/10] ignite git commit: wip

Posted by sb...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/855c66b1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/855c66b1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/855c66b1

Branch: refs/heads/ignite-4680-sb
Commit: 855c66b11b8eded1f3ff3e4ff1df587c724c5c70
Parents: f33235d
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 13:44:38 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 13:44:38 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  6 ++-
 .../dht/atomic/NearAtomicResponseHelper.java    | 55 +++++++++-----------
 2 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/855c66b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 05d85ad..e68d72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -214,8 +214,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (req.writeSynchronizationMode() != FULL_ASYNC) {
                     if (req.responseHelper() != null) {
-                        if (req.responseHelper().addResponse(res))
-                            sendNearUpdateReply(res.nodeId(), req.responseHelper().response());
+                        GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+                        if (res0 != null)
+                            sendNearUpdateReply(res.nodeId(), res0);
                     }
                     else
                         sendNearUpdateReply(res.nodeId(), res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/855c66b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 9e35e8f..00c9f6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -42,45 +42,42 @@ public class NearAtomicResponseHelper {
      * @param res Response.
      * @return {@code true} if all responses added.
      */
-    public boolean addResponse(GridNearAtomicUpdateResponse res) {
+    public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
         synchronized (this) {
-            if (res.stripe() == -1) {
-                this.res = res;
-                this.res.stripe(-1);
-
-                return true;
-            }
+            if (res.stripe() == -1)
+                return res;
 
             if (stripes.remove(res.stripe())) {
-                if (this.res == null)
-                    this.res = res;
-                else {
-                    if (res.nearValuesIndexes() != null)
-                        for (int i = 0; i < res.nearValuesIndexes().size(); i++)
-                            this.res.addNearValue(
-                                res.nearValuesIndexes().get(i),
-                                res.nearValue(i),
-                                res.nearTtl(i),
-                                res.nearExpireTime(i)
-                            );
+                mergeResponse(res);
 
-                    if (res.failedKeys() != null)
-                        this.res.addFailedKeys(res.failedKeys(), null);
-
-                    if (res.skippedIndexes() != null)
-                        this.res.skippedIndexes().addAll(res.skippedIndexes());
-                }
-                return stripes.isEmpty();
+                return stripes.isEmpty() ? this.res : null;
             }
 
-            return false;
+            return null;
         }
     }
 
     /**
-     * @return Response.
+     * @param res Response.
      */
-    public GridNearAtomicUpdateResponse response() {
-        return res;
+    private void mergeResponse(GridNearAtomicUpdateResponse res) {
+        if (this.res == null)
+            this.res = res;
+        else {
+            if (res.nearValuesIndexes() != null)
+                for (int i = 0; i < res.nearValuesIndexes().size(); i++)
+                    this.res.addNearValue(
+                        res.nearValuesIndexes().get(i),
+                        res.nearValue(i),
+                        res.nearTtl(i),
+                        res.nearExpireTime(i)
+                    );
+
+            if (res.failedKeys() != null)
+                this.res.addFailedKeys(res.failedKeys(), null);
+
+            if (res.skippedIndexes() != null)
+                this.res.skippedIndexes().addAll(res.skippedIndexes());
+        }
     }
 }


[04/10] ignite git commit: ignite-4680 one-message-back

Posted by sb...@apache.org.
ignite-4680 one-message-back


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f33235db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f33235db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f33235db

Branch: refs/heads/ignite-4680-sb
Commit: f33235dbda1b3081468b2d00a5337d27f77aa930
Parents: d8fa6dd
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 13:10:06 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 13:10:06 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  5 ++
 .../dht/atomic/GridDhtAtomicCache.java          | 10 ++-
 .../GridNearAtomicAbstractUpdateFuture.java     | 62 ++++----------
 .../GridNearAtomicAbstractUpdateRequest.java    | 19 +++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 18 ++--
 .../atomic/GridNearAtomicUpdateResponse.java    | 17 +---
 .../dht/atomic/NearAtomicResponseHelper.java    | 86 ++++++++++++++++++++
 7 files changed, 140 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 5fd2845..39c514b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -829,6 +830,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
 
             if (stripemap != null) {
+                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+
+                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+
                 for (Integer stripe : stripemap.keySet()) {
                     stripedExecutor.execute(stripe, c);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aeb379a..05d85ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -212,8 +212,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         updateReplyClos = new UpdateReplyClosure() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                if (req.writeSynchronizationMode() != FULL_ASYNC)
-                    sendNearUpdateReply(res.nodeId(), res);
+                if (req.writeSynchronizationMode() != FULL_ASYNC) {
+                    if (req.responseHelper() != null) {
+                        if (req.responseHelper().addResponse(res))
+                            sendNearUpdateReply(res.nodeId(), req.responseHelper().response());
+                    }
+                    else
+                        sendNearUpdateReply(res.nodeId(), res);
+                }
                 else {
                     if (res.remapTopologyVersion() != null)
                         // Remap keys on primary node in FULL_ASYNC mode.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 770999a..2d02795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -357,6 +356,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return Response to notify about primary failure.
      */
     final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
+//        assert req == null : req;
         assert req.nodeId() != null : req;
 
         if (msgLog.isDebugEnabled()) {
@@ -426,9 +426,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private GridNearAtomicUpdateResponse res;
 
         /** */
-        private Map<Integer, GridNearAtomicUpdateResponse> resMap;
-
-        /** */
         @GridToStringInclude
         Set<UUID> dhtNodes;
 
@@ -437,7 +434,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> rcvd;
 
         /** */
-        private boolean hasDhtRes;
+        private boolean hasRes;
 
         /**
          * @param req Request.
@@ -522,24 +519,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
-         * @return {@code True} if all near results gathered.
-         */
-        private boolean hasAllNearResults() {
-            return res != null || (resMap != null && resMap.size() == req.stripes());
-        }
-
-        public String state() {
-            return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
-        }
-
-        /**
          * @return {@code True} if all expected responses are received.
          */
         private boolean finished() {
             if (req.writeSynchronizationMode() == PRIMARY_SYNC)
-                return hasAllNearResults();
+                return hasRes;
 
-            return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
         }
 
         /**
@@ -559,7 +545,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return req;
             }
 
-            return hasAllNearResults() ? req : null;
+            return this.res == null ? req : null;
         }
 
         /**
@@ -576,7 +562,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return null;
 
-            return !hasAllNearResults() ? req : null;
+            return this.res == null ? req : null;
         }
 
         /**
@@ -588,7 +574,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return DhtLeftResult.NOT_DONE;
 
             if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
-                if (hasDhtRes)
+                if (hasRes)
                     return DhtLeftResult.DONE;
                 else
                     return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -609,7 +595,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return false;
 
             if (res.hasResult())
-                hasDhtRes = true;
+                hasRes = true;
 
             if (dhtNodes == null) {
                 if (rcvd == null)
@@ -631,6 +617,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
             assert !finished() : this;
 
+            hasRes = true;
+
             boolean onRes = storeResponse(res);
 
             assert onRes;
@@ -654,7 +642,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @param cctx Context.
          */
         private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
-            assert F.isEmpty(dhtNodes) || req.initMappingLocally();
+            assert dhtNodes == null || req.initMappingLocally();
 
             Set<UUID> dhtNodes0 = dhtNodes;
 
@@ -691,22 +679,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if current response was {@code null}.
          */
         private boolean storeResponse(GridNearAtomicUpdateResponse res) {
-            if (res.stripe() > -1) {
-                if (resMap == null)
-                    resMap = U.newHashMap(req.stripes());
+            if (this.res == null) {
+                this.res = res;
 
-                if (!resMap.containsKey(res.stripe())) {
-                    resMap.put(res.stripe(), res);
-                    return true;
-                }
-            }
-            else {
-                if (this.res == null) {
-                    this.res = res;
-//                    this.resMap = null;
-
-                    return true;
-                }
+                return true;
             }
 
             return false;
@@ -717,7 +693,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          */
         void resetResponse() {
             res = null;
-            resMap = null;
         }
 
         /**
@@ -727,19 +702,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             return res;
         }
 
-        /**
-         * @return Response.
-         */
-        @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
-            return resMap;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
                 "primary", primaryId(),
                 "needPrimaryRes", req.needPrimaryResponse(),
-                "primaryRes", this.res != null,
+                "primaryRes", res != null,
                 "done", finished());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index b549370..0748434 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,6 +89,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridToStringExclude
     protected byte flags;
 
+    /** Response helper. */
+    @GridDirectTransient
+    private NearAtomicResponseHelper responseHelper;
+
     /**
      *
      */
@@ -419,6 +423,21 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
+     * @return Response helper.
+     */
+    public NearAtomicResponseHelper responseHelper() {
+        return responseHelper;
+    }
+
+    /**
+     * @param responseHelper Response helper.
+     */
+    public void responseHelper(
+        NearAtomicResponseHelper responseHelper) {
+        this.responseHelper = responseHelper;
+    }
+
+    /**
      * @param idx Key index.
      * @return Key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9c4de66..8774d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -371,7 +371,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        boolean rcvAll = false;
+        boolean rcvAll;
 
         synchronized (mux) {
             if (futId == -1 || futId != res.futureId())
@@ -412,8 +412,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (msgLog.isDebugEnabled())
                         msgLog.debug("Processed near atomic update response " +
                             "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
-                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
-                            " " + reqState.state() + ']');
+                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") + ']');
                 }
                 else
                     return;
@@ -472,18 +471,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    if (reqState.responses() != null) {
-                        for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
-                            updateNear(reqState.req, res0);
-                        }
-                    }
-                    else {
-                        GridNearAtomicUpdateResponse res0 = reqState.response();
+                    GridNearAtomicUpdateResponse res0 = reqState.response();
 
-                        assert res0 != null : reqState;
+                    assert res0 != null : reqState;
 
-                        updateNear(reqState.req, res0);
-                    }
+                    updateNear(reqState.req, res0);
                 }
             }
             else if (!nodeErr)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index f8b45d3..f0f954c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -97,6 +97,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     private int partId = -1;
 
     /** Stripe. */
+    @GridDirectTransient
     private int stripe = -1;
 
     /** */
@@ -548,12 +549,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 writer.incrementState();
 
-            case 15:
-                if (!writer.writeInt("stripe", stripe))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -666,14 +661,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 15:
-                stripe = reader.readInt("stripe");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -686,7 +673,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
new file mode 100644
index 0000000..9e35e8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class NearAtomicResponseHelper {
+
+    /** */
+    private GridNearAtomicUpdateResponse res;
+
+    /** */
+    private Set<Integer> stripes;
+
+    /**
+     * @param stripes Stripes collection.
+     */
+    public NearAtomicResponseHelper(Set<Integer> stripes) {
+        this.stripes = new HashSet<>(stripes);
+    }
+
+    /**
+     * @param res Response.
+     * @return {@code true} if all responses added.
+     */
+    public boolean addResponse(GridNearAtomicUpdateResponse res) {
+        synchronized (this) {
+            if (res.stripe() == -1) {
+                this.res = res;
+                this.res.stripe(-1);
+
+                return true;
+            }
+
+            if (stripes.remove(res.stripe())) {
+                if (this.res == null)
+                    this.res = res;
+                else {
+                    if (res.nearValuesIndexes() != null)
+                        for (int i = 0; i < res.nearValuesIndexes().size(); i++)
+                            this.res.addNearValue(
+                                res.nearValuesIndexes().get(i),
+                                res.nearValue(i),
+                                res.nearTtl(i),
+                                res.nearExpireTime(i)
+                            );
+
+                    if (res.failedKeys() != null)
+                        this.res.addFailedKeys(res.failedKeys(), null);
+
+                    if (res.skippedIndexes() != null)
+                        this.res.skippedIndexes().addAll(res.skippedIndexes());
+                }
+                return stripes.isEmpty();
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @return Response.
+     */
+    public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
+}


[08/10] ignite git commit: tmp

Posted by sb...@apache.org.
tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3200c2e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3200c2e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3200c2e4

Branch: refs/heads/ignite-4680-sb
Commit: 3200c2e43a9b2cee1d35a19f749de50f19f5d0d4
Parents: 5f51839
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 16:59:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:46:01 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  36 +--
 .../dht/atomic/GridDhtAtomicCache.java          | 265 +++++++++----------
 .../dht/atomic/NearAtomicResponseHelper.java    |  29 +-
 3 files changed, 151 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6dad30b..17ae595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -856,23 +856,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 //        }
 
 
-        if (plc == GridIoPolicy.SYSTEM_POOL &&
-            (msg.partition() != Integer.MIN_VALUE ||
-                msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
-            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
-                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
-
-            if (stripemap != null) {
-                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
-
-                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
-
-                for (Integer stripe : stripemap.keySet()) {
-                    stripedExecutor.execute(stripe, c);
-                }
-            }
-            else
-                stripedExecutor.execute(msg.partition(), c);
+        if (plc == GridIoPolicy.SYSTEM_POOL && (msg.partition() != Integer.MIN_VALUE)) {
+//            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+//                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+//
+//            if (stripemap != null) {
+//                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+//
+//                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+//
+//                for (Integer stripe : stripemap.keySet()) {
+//                    stripedExecutor.execute(stripe, c);
+//                }
+//            }
+//            else
+
+//            if (msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)
+//                stripedExecutor.execute(((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap().keySet().iterator().next(), c);
+//            else
+            stripedExecutor.execute(msg.partition(), c);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dcc79d0..bcfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -213,14 +213,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (req.writeSynchronizationMode() != FULL_ASYNC) {
-                    if (req.responseHelper() != null) {
-                        GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
-
-                        if (res0 != null)
-                            sendNearUpdateReply(res.nodeId(), res0);
-                    }
-                    else
-                        sendNearUpdateReply(res.nodeId(), res);
+                    sendNearUpdateReply(res.nodeId(), res);
                 }
                 else {
                     if (res.remapTopologyVersion() != null)
@@ -1684,22 +1677,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut;
-
-        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
-            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && req.stripeMap() != null) {
-            int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+//        if (true) {
+//            updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
+//
+//            return;
+//        }
 
-            List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
-
-            for (int i = 0; i < stripeIdxs.length; i++)
-                keys.add(req.key(stripeIdxs[i]));
-
-            forceFut = preldr.request(keys, req.topologyVersion());
-        }
-        else
-            forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {
@@ -1715,9 +1699,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return;
             }
 
-            updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
+            updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
         }
         else {
+            if (true)
+                throw new RuntimeException("error");
+
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
                 @Override public void apply(IgniteInternalFuture<Object> fut) {
                     try {
@@ -1761,9 +1748,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         completionCb.apply(req, res);
     }
 
-    private GridCacheVersion ver;
-
-
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1774,11 +1758,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void updateAllAsyncInternal0(
         UUID nodeId,
-        GridNearAtomicAbstractUpdateRequest req,
+        final GridNearAtomicAbstractUpdateRequest req,
         int stripeIdx,
-        UpdateReplyClosure completionCb
+        final UpdateReplyClosure completionCb
     ) {
-        ClusterNode node = ctx.discovery().node(nodeId);
+        final ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null) {
             U.warn(msgLog, "Skip near update request, node originated update request left [" +
@@ -1787,25 +1771,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return;
         }
 
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
-            nodeId,
-            req.futureId(),
-            req.partition(),
-            false,
-            ctx.deploymentEnabled());
-
-        res.partition(req.partition());
-
-        int[] stripeIdxs = null;
-
-        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
-            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && req.stripeMap() != null) {
-            stripeIdxs = req.stripeMap().get(stripeIdx);
-
-            res.stripe(stripeIdx);
-        }
-
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1830,103 +1795,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
-                            "(cache is stopped): " + name()));
-
-                        completionCb.apply(req, res);
 
                         return;
                     }
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
-
-                        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
-
-                        // Assign next version for update inside entries lock.
-                        if (ver == null)
-                            ver = ctx.versions().next(top.topologyVersion());
-
-                        if (hasNear)
-                            res.nearVersion(ver);
-
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
-                                ", writeVer=" + ver + ']');
-                        }
-
-                        assert ver != null : "Got null version for update request: " + req;
-
-                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
-
-                        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
-
-                        dhtFut = null;//createDhtFuture(ver, req, size);
-
-                        expiry = expiryPolicy(req.expiry());
+                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        Map<Integer, int[]> stripemap = req.stripeMap();
 
-                        GridCacheReturn retVal = null;
+                        final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null, req, req.size());
 
-                        if (size > 1 &&                           // Several keys ...
-                            writeThrough() && !req.skipStore() && // and store is enabled ...
-                            !ctx.store().isLocal() &&             // and this is not local store ...
-                                                                  // (conflict resolver should be used for local store)
-                            !ctx.dr().receiveEnabled()            // and no DR.
-                            ) {
-                            // This method can only be used when there are no replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal,
-                                stripeIdxs);
-
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
-
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
-                        }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal,
-                                stripeIdxs);
+                        ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size()));
 
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
+                            if (stripeIdx == e.getKey())
+                                update(fut, node, req, e.getValue(), completionCb);
+                            else {
+                                ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
+                                    @Override public void run() {
+                                        try {
+                                            update(fut, node, req, e.getValue(), completionCb);
+                                        }
+                                        catch (Exception e) {
+                                            e.printStackTrace();
+                                        }
+                                    }
+                                });
+                            }
                         }
-
-                        if (retVal == null)
-                            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
-
-                        res.returnValue(retVal);
-
-                        if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
                     }
                     else {
                         // Should remap all keys.
                         remap = true;
 
-                        res.remapTopologyVersion(top.topologyVersion());
+                        //res.remapTopologyVersion(top.topologyVersion());
                     }
                 }
                 finally {
@@ -1958,16 +1861,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             remap = true;
 
-            res.remapTopologyVersion(ctx.topology().topologyVersion());
+            //res.remapTopologyVersion(ctx.topology().topologyVersion());
         }
         catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            addAllKeysAsFailed(req, res, stripeIdxs, e);
+            //addAllKeysAsFailed(req, res, stripeIdxs, e);
 
-            completionCb.apply(req, res);
+            //completionCb.apply(req, res);
 
             if (e instanceof Error)
                 throw e;
@@ -1975,23 +1878,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return;
         }
 
-        if (remap) {
-            assert dhtFut == null;
-            res.stripe(-1);
+//        if (remap) {
+//            assert dhtFut == null;
+//            res.stripe(-1);
+//
+//            completionCb.apply(req, res);
+//        }
+//        else {
+//            if (dhtFut != null)
+//                dhtFut.map(node, res.returnValue(), res, completionCb);
+//            else
+//                completionCb.apply(req, res);
+//        }
+//
+//        if (req.writeSynchronizationMode() != FULL_ASYNC)
+//            req.cleanup(!node.isLocal());
+//
+//        sendTtlUpdateRequest(expiry);
+    }
+
+    private void update(
+        GridDhtAtomicAbstractUpdateFuture fut,
+        ClusterNode node,
+        GridNearAtomicAbstractUpdateRequest req,
+        int[] stripeIdxs,
+        UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            node.id(),
+            req.futureId(),
+            req.partition(),
+            false,
+            ctx.deploymentEnabled());
 
-            completionCb.apply(req, res);
-        }
-        else {
-            if (dhtFut != null)
-                dhtFut.map(node, res.returnValue(), res, completionCb);
-            else
-                completionCb.apply(req, res);
+        List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
+
+        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+        // Assign next version for update inside entries lock.
+        //if (ver == null)
+        GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion());
+
+        if (hasNear)
+            res.nearVersion(ver);
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                ", writeVer=" + ver + ']');
         }
 
-        if (req.writeSynchronizationMode() != FULL_ASYNC)
-            req.cleanup(!node.isLocal());
+        assert ver != null : "Got null version for update request: " + req;
+
+        boolean sndPrevVal = false;//!top.rebalanceFinished(req.topologyVersion());
+
+        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+        GridCacheReturn retVal = null;
+
+        UpdateSingleResult updRes = updateSingle(node,
+            hasNear,
+            req,
+            res,
+            locked,
+            ver,
+            null,
+            ctx.isDrEnabled(),
+            null,
+            null,
+            sndPrevVal,
+            stripeIdxs);
+
+        retVal = updRes.returnValue();
+
+        if (retVal == null)
+            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
 
-        sendTtlUpdateRequest(expiry);
+        res.returnValue(retVal);
+
+        unlockEntries(locked, null);
+
+        GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+        if (res0 != null) {
+            fut.onDone();
+
+            completionCb.apply(req, res);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 00c9f6c..55c450c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  *
@@ -28,14 +27,16 @@ public class NearAtomicResponseHelper {
     /** */
     private GridNearAtomicUpdateResponse res;
 
+    private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
+        AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
+
     /** */
-    private Set<Integer> stripes;
+    private volatile int cnt;
 
     /**
-     * @param stripes Stripes collection.
      */
-    public NearAtomicResponseHelper(Set<Integer> stripes) {
-        this.stripes = new HashSet<>(stripes);
+    public NearAtomicResponseHelper(int cnt) {
+        this.cnt = cnt;
     }
 
     /**
@@ -43,18 +44,16 @@ public class NearAtomicResponseHelper {
      * @return {@code true} if all responses added.
      */
     public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
-        synchronized (this) {
-            if (res.stripe() == -1)
-                return res;
+        this.res = res;
 
-            if (stripes.remove(res.stripe())) {
-                mergeResponse(res);
+        int c = UPD.decrementAndGet(this);
 
-                return stripes.isEmpty() ? this.res : null;
-            }
+        //mergeResponse(res);
 
-            return null;
-        }
+        if (c == 0)
+            return this.res;
+
+        return null;
     }
 
     /**


[06/10] ignite git commit: ignite-4680 wip

Posted by sb...@apache.org.
ignite-4680 wip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e7ee08a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e7ee08a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e7ee08a

Branch: refs/heads/ignite-4680-sb
Commit: 3e7ee08a486a7fcdd080c0807433abe22c3f171b
Parents: 855c66b
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Thu Mar 16 15:02:55 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Thu Mar 16 15:02:55 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 32 ++++++++++++++++----
 .../GridNearAtomicAbstractUpdateRequest.java    |  8 +++++
 .../atomic/GridNearAtomicFullUpdateRequest.java |  6 ++--
 .../distributed/near/GridNearAtomicCache.java   |  5 ++-
 4 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e68d72d..973256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1684,7 +1684,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut;
+
+        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
+            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+            && req.stripeMap() != null) {
+            int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+
+            List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
+
+            for (int i = 0; i < stripeIdxs.length; i++)
+                keys.add(req.key(stripeIdxs[i]));
+
+            forceFut = preldr.request(keys, req.topologyVersion());
+        }
+        else
+            forceFut = preldr.request(req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {
@@ -1782,8 +1797,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
             && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
-            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+            && req.stripeMap() != null) {
+            stripeIdxs = req.stripeMap().get(stripeIdx);
 
             res.stripe(stripeIdx);
         }
@@ -1958,6 +1973,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         if (remap) {
             assert dhtFut == null;
+            res.stripe(-1);
 
             completionCb.apply(req, res);
         }
@@ -2888,8 +2904,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         AffinityTopologyVersion topVer,
         int[] stripeIdxs)
         throws GridDhtInvalidPartitionException {
-        if (req.size() == 1) {
-            KeyCacheObject key = req.key(0);
+
+        int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+        if (keysNum == 1) {
+            int idx = stripeIdxs != null ? stripeIdxs[0] : 0;
+
+            KeyCacheObject key = req.key(idx);
 
             while (true) {
                 GridDhtCacheEntry entry = entryExx(key, topVer);
@@ -2903,7 +2924,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
             List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 0748434..c8e904d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
@@ -443,6 +444,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      */
     public abstract KeyCacheObject key(int idx);
 
+    /**
+     * @return Stripe map.
+     */
+    @Nullable public Map<Integer, int[]> stripeMap() {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 10;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 5a8c66b..2e619ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -404,10 +404,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         return expiryPlc;
     }
 
-    /**
-     * @return Stripe mapping.
-     */
-    @Nullable public Map<Integer, int[]> stripeMap() {
+    /** {@inheritDoc} */
+    @Override @Nullable public Map<Integer, int[]> stripeMap() {
         return stripeMap;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e7ee08a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5844021..299386c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -134,8 +133,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         int keyNum;
         int[] stripeIdxs;
 
-        if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
-            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+        if (res.stripe() > -1 && req.stripeMap() != null) {
+            stripeIdxs = req.stripeMap().get(res.stripe());
             keyNum = stripeIdxs.length;
         }
         else {


[02/10] ignite git commit: ignite-4680-2

Posted by sb...@apache.org.
ignite-4680-2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1b4ebd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1b4ebd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1b4ebd4

Branch: refs/heads/ignite-4680-sb
Commit: d1b4ebd48290af5fb3a2d0d9df57219dd4ce1edb
Parents: 8e5e3cb
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 19:41:49 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 19:41:49 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../managers/communication/GridIoManager.java   |  23 +-
 .../cache/GridCacheAffinityManager.java         |  21 +-
 .../processors/cache/GridCacheAtomicFuture.java |   2 +-
 .../cache/GridCacheEvictionManager.java         |   2 +-
 .../processors/cache/GridCacheIoManager.java    |   6 +-
 .../cache/GridCacheManagerAdapter.java          |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  14 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 184 ++++++++---
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 102 +++++-
 .../GridNearAtomicAbstractUpdateRequest.java    |  46 +--
 .../atomic/GridNearAtomicFullUpdateRequest.java | 127 ++++++-
 .../GridNearAtomicSingleUpdateFuture.java       |  25 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  72 ++--
 .../atomic/GridNearAtomicUpdateResponse.java    |  40 ++-
 .../distributed/near/GridNearAtomicCache.java   |  27 +-
 .../cache/version/GridCacheVersionManager.java  |  47 +++
 .../apache/ignite/internal/util/MPSCQueue.java  | 331 +++++++++++++++++++
 .../ignite/internal/util/StripedExecutor.java   | 125 ++++++-
 .../ignite/thread/IgniteStripeThread.java       |  47 +++
 .../GridNearAtomicFullUpdateRequestTest.java    |  74 +++++
 .../dht/atomic/MarshallingAbstractTest.java     | 156 +++++++++
 .../version/GridCacheVersionManagerTest.java    |  86 +++++
 26 files changed, 1380 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..d37e4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -227,6 +227,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
@@ -1167,7 +1168,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                                 pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
                                 "    ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
                                 sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
-                                "    ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
+                                "    ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]" + NL +
+                                "    ^-- Striped executor [" + ctx.getStripedExecutorService().getMetrics() + "]";
 
                             log.info(msg);
                         }
@@ -1440,6 +1442,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
 
+        add(ATTR_STRIPES_CNT, ctx.getStripedExecutorService() != null ? ctx.getStripedExecutorService().stripes() : -1);
+
         if (cfg.getConsistentId() != null)
             add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 8294026..f1a2558 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -168,6 +168,9 @@ public final class IgniteNodeAttributes {
     /** Ignite services compatibility mode (can be {@code null}). */
     public static final String ATTR_SERVICES_COMPATIBILITY_MODE = ATTR_PREFIX + ".services.compatibility.enabled";
 
+    /** Ignite cache stripe size. */
+    public static final String ATTR_STRIPES_CNT = ATTR_PREFIX + ".cache.stripes.cnt";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..5fd2845 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -97,9 +99,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
 import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -807,19 +809,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         };
 
+        StripedExecutor stripedExecutor = ctx.getStripedExecutorService();
+
         if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
             IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
 
             if (msg0.processFromNioThread())
                 c.run();
             else
-                ctx.getStripedExecutorService().execute(-1, c);
+                stripedExecutor.execute(-1, c);
 
             return;
         }
 
-        if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
-            ctx.getStripedExecutorService().execute(msg.partition(), c);
+        if (plc == GridIoPolicy.SYSTEM_POOL &&
+            (msg.partition() != Integer.MIN_VALUE ||
+                msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
+            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+
+            if (stripemap != null) {
+                for (Integer stripe : stripemap.keySet()) {
+                    stripedExecutor.execute(stripe, c);
+                }
+            }
+            else
+                stripedExecutor.execute(msg.partition(), c);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 17c9319..621634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
@@ -33,12 +38,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
 /**
  * Cache affinity manager.
  */
@@ -88,10 +87,15 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      *
      */
     public void cancelFutures() {
+        if (!starting.get())
+            // Ignoring attempt to stop manager that has never been started.
+            return;
+
         IgniteCheckedException err =
             new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
 
-        aff.cancelFutures(err);
+        if (aff != null)
+            aff.cancelFutures(err);
     }
 
     /** {@inheritDoc} */
@@ -99,7 +103,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to wait for topology update, client disconnected.");
 
-        aff.cancelFutures(err);
+        if (aff != null)
+            aff.cancelFutures(err);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
      * @return Future ID.
      */
-    public Long id();
+    public long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 06c707e..e76b773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1966,7 +1966,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             if (lock.readLock().tryLock()) {
                 try {
                     if (log.isDebugEnabled())
-                        log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId +
+                        log.debug("Entered to eviction future storeResponse() [fut=" + this + ", node=" + nodeId +
                             ", res=" + res + ']');
 
                     ClusterNode node = cctx.node(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 99878ec..3cf68d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -431,7 +431,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 append(", dhtTxId=").append(dhtTxId(cacheMsg)).
                 append(", msg=").append(cacheMsg);
         }
-        else if (atomicFututeId(cacheMsg) != null) {
+        else if (atomicFututeId(cacheMsg) > -1) {
             builder.append("futId=").append(atomicFututeId(cacheMsg)).
                 append(", writeVer=").append(atomicWriteVersion(cacheMsg)).
                 append(", msg=").append(cacheMsg);
@@ -484,7 +484,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param cacheMsg Cache message.
      * @return Atomic future ID if applicable for message.
      */
-    @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
+    @Nullable private long atomicFututeId(GridCacheMessage cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
             return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
@@ -496,7 +496,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
             return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
 
-        return null;
+        return -1;
     }
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 8ad0ea8..ab965de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -34,7 +34,7 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
     protected IgniteLogger log;
 
     /** Starting flag. */
-    private final AtomicBoolean starting = new AtomicBoolean(false);
+    protected final AtomicBoolean starting = new AtomicBoolean(false);
 
     /** {@inheritDoc} */
     @Override public final void start(GridCacheContext<K, V> cctx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..c4cbefd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -132,7 +132,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     };
 
     /** Logger. */
-    @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private IgniteLogger exchLog;
 
     /** */
@@ -256,9 +256,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    Long futId = cacheFut.id();
+                    long futId = cacheFut.id();
 
-                    if (futId != null)
+                    if (futId > -1)
                         atomicFuts.remove(futId, cacheFut);
                 }
             }
@@ -437,7 +437,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+    public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
         assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +472,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
         return atomicFuts.get(futId);
     }
 
@@ -480,7 +480,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
         return atomicFuts.remove(futId);
     }
 
@@ -511,8 +511,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-
-    /**
      * Adds future.
      *
      * @param fut Future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..17ee298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public final Long id() {
+    public final long id() {
         return futId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..4ab5000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,6 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteStripeThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -278,9 +279,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     UUID nodeId,
                     GridNearAtomicAbstractUpdateRequest req
                 ) {
+                    int stripeIdx;
+
+                    if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
+                        stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+                    else
+                        stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+
                     processNearAtomicUpdateRequest(
                         nodeId,
-                        req);
+                        req,
+                        stripeIdx);
                 }
 
                 @Override public String toString() {
@@ -1640,6 +1649,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Executes local update after preloader fetched values.
+     *
+     * @param nodeId Node ID.
+     * @param req Update request.
+     * @param completionCb Completion callback.
+     */
+    public void updateAllAsyncInternal(
+        final UUID nodeId,
+        final GridNearAtomicAbstractUpdateRequest req,
+        final UpdateReplyClosure completionCb
+    ) {
+        updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+    }
+
+    /**
      * Executes local update.
      *
      * @param nodeId Node ID.
@@ -1649,6 +1673,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
+        final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1667,7 +1692,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return;
             }
 
-            updateAllAsyncInternal0(nodeId, req, completionCb);
+            updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
         }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1684,7 +1709,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         return;
                     }
 
-                    updateAllAsyncInternal0(nodeId, req, completionCb);
+                    updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
                 }
             });
         }
@@ -1718,11 +1743,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param nodeId Node ID.
      * @param req Update request.
+     * @param stripeIdx Stripe index.
      * @param completionCb Completion callback.
      */
     private void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicAbstractUpdateRequest req,
+        int stripeIdx,
         UpdateReplyClosure completionCb
     ) {
         ClusterNode node = ctx.discovery().node(nodeId);
@@ -1741,6 +1768,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             ctx.deploymentEnabled());
 
+        res.partition(req.partition());
+
+        int[] stripeIdxs = null;
+
+        if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+            && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
+            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+
+            res.stripe(stripeIdx);
+        }
+
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1765,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+                        addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
@@ -1776,7 +1815,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
                     if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        locked = lockEntries(req, req.topologyVersion());
+                        locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
@@ -1795,13 +1834,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
 
-                        dhtFut = createDhtFuture(ver, req);
+                        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+                        dhtFut = createDhtFuture(ver, req, size);
 
                         expiry = expiryPolicy(req.expiry());
 
                         GridCacheReturn retVal = null;
 
-                        if (req.size() > 1 &&                    // Several keys ...
+                        if (size > 1 &&                           // Several keys ...
                             writeThrough() && !req.skipStore() && // and store is enabled ...
                             !ctx.store().isLocal() &&             // and this is not local store ...
                                                                   // (conflict resolver should be used for local store)
@@ -1818,7 +1859,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
@@ -1837,7 +1879,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ctx.isDrEnabled(),
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1895,7 +1938,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(req.keys(), e);
+            addAllKeysAsFailed(req, res, stripeIdxs, e);
 
             completionCb.apply(req, res);
 
@@ -1921,6 +1964,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Adds all keys as failed to response.
+     *
+     * @param req Request.
+     * @param res Response.
+     * @param stripeIdx Stripe indexes.
+     * @param e Throwable.
+     */
+    private void addAllKeysAsFailed(GridNearAtomicAbstractUpdateRequest req,
+        GridNearAtomicUpdateResponse res,
+        int[] stripeIdx,
+        Throwable e) {
+
+        if (stripeIdx == null)
+            res.addFailedKeys(req.keys(), e);
+        else {
+            for (int i = 0; i < stripeIdx.length; i++)
+                res.addFailedKey(req.key(stripeIdx[i]), e);
+        }
+    }
+
+    /**
      * Updates locked entries using batched write-through.
      *
      * @param node Sender node.
@@ -1934,6 +1998,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param taskName Task name.
      * @param expiry Expiry policy.
      * @param sndPrevVal If {@code true} sends previous value to backups.
+     * @param stripeIdxs Stripe indexes.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1949,7 +2014,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
-        final boolean sndPrevVal
+        final boolean sndPrevVal,
+        final int[] stripeIdxs
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1959,13 +2025,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 reloadIfNeeded(locked);
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKeys(req.keys(), e);
+                addAllKeysAsFailed(req, res, stripeIdxs, e);
 
                 return new UpdateBatchResult();
             }
         }
 
-        int size = req.size();
+        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
 
         Map<KeyCacheObject, CacheObject> putMap = null;
 
@@ -1990,6 +2056,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (int i = 0; i < locked.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
             try {
                 if (!checkFilter(entry, req, res)) {
                     if (expiry != null && entry.hasValue()) {
@@ -2017,7 +2085,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (op == TRANSFORM) {
-                    EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+                    EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(trueIdx);
 
                     CacheObject old = entry.innerGet(
                         ver,
@@ -2099,7 +2167,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes,
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             firstEntryIdx = i;
 
@@ -2147,7 +2216,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes,
                                 taskName,
                                 expiry,
-                                sndPrevVal);
+                                sndPrevVal,
+                                stripeIdxs);
 
                             firstEntryIdx = i;
 
@@ -2172,12 +2242,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     entryProcessorMap.put(entry.key(), entryProcessor);
                 }
                 else if (op == UPDATE) {
-                    CacheObject updated = req.value(i);
+                    CacheObject updated = req.value(trueIdx);
 
                     if (intercept) {
                         CacheObject old = entry.innerGet(
-                             null,
-                             null,
+                            null,
+                            null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
                             /*metrics*/true,
@@ -2273,7 +2343,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 updRes,
                 taskName,
                 expiry,
-                sndPrevVal);
+                sndPrevVal,
+                stripeIdxs);
         }
         else
             assert filtered.isEmpty();
@@ -2350,6 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param taskName Task name.
      * @param expiry Expiry policy.
      * @param sndPrevVal If {@code true} sends previous value to backups.
+     * @param stripeIdxs Stripe indexes.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -2364,7 +2436,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean sndPrevVal
+        boolean sndPrevVal,
+        int[] stripeIdxs
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -2377,9 +2450,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
 
+        int keyNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
         // Avoid iterator creation.
-        for (int i = 0; i < req.size(); i++) {
-            KeyCacheObject k = req.key(i);
+        for (int i = 0; i < keyNum; i++) {
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+            KeyCacheObject k = req.key(trueIdx);
 
             GridCacheOperation op = req.operation();
 
@@ -2388,13 +2465,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtCacheEntry entry = locked.get(i);
 
-                GridCacheVersion newConflictVer = req.conflictVersion(i);
-                long newConflictTtl = req.conflictTtl(i);
-                long newConflictExpireTime = req.conflictExpireTime(i);
+                GridCacheVersion newConflictVer = req.conflictVersion(trueIdx);
+                long newConflictTtl = req.conflictTtl(trueIdx);
+                long newConflictExpireTime = req.conflictExpireTime(trueIdx);
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
-                Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
+                Object writeVal = op == TRANSFORM ? req.entryProcessor(trueIdx) : req.writeValue(trueIdx);
 
                 Collection<UUID> readers = null;
                 Collection<UUID> filteredReaders = null;
@@ -2478,13 +2555,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue()) {
-                                res.addNearValue(i,
+                                res.addNearValue(trueIdx,
                                     updRes.newValue(),
                                     updRes.newTtl(),
                                     updRes.conflictExpireTime());
                             }
                             else
-                                res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
+                                res.addNearTtl(trueIdx, updRes.newTtl(), updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2495,10 +2572,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
                         else
-                            res.addSkippedIndex(i);
+                            res.addSkippedIndex(trueIdx);
                     }
                     else
-                        res.addSkippedIndex(i);
+                        res.addSkippedIndex(trueIdx);
                 }
 
                 if (updRes.removeVersion() != null) {
@@ -2548,7 +2625,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /**
      * @param hasNear {@code True} if originating node has near cache.
-     * @param firstEntryIdx Index of the first entry in the request keys collection.
+     * @param firstEntryIdx Index of the first entry in the request keys collection or in the stripeIdxs.
      * @param entries Entries to update.
      * @param ver Version to set.
      * @param nearNode Originating node.
@@ -2584,7 +2661,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final UpdateBatchResult batchRes,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
-        final boolean sndPrevVal
+        final boolean sndPrevVal,
+        final int[] stripeIdxs
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -2737,17 +2815,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (hasNear) {
+                        // it's index inside all keys
+                        int trueIdx = stripeIdxs == null ? firstEntryIdx + i : stripeIdxs[firstEntryIdx + i];
+
                         if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
                             int idx = firstEntryIdx + i;
 
                             if (req.operation() == TRANSFORM) {
-                                res.addNearValue(idx,
+                                res.addNearValue(trueIdx,
                                     writeVal,
                                     updRes.newTtl(),
                                     CU.EXPIRE_TIME_CALCULATE);
                             }
                             else
-                                res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+                                res.addNearTtl(trueIdx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
                             if (writeVal != null || entry.hasValue()) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2758,7 +2839,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (readers.contains(nearNode.id())) // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
                         else
-                            res.addSkippedIndex(firstEntryIdx + i);
+                            res.addSkippedIndex(trueIdx);
                     }
                 }
                 catch (GridCacheEntryRemovedException e) {
@@ -2789,12 +2870,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param req Request with keys to lock.
      * @param topVer Topology version to lock on.
+     * @param stripeIdxs Stripe indexes.
      * @return Collection of locked entries.
      * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown,
      *      locks are released.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
+    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req,
+        AffinityTopologyVersion topVer,
+        int[] stripeIdxs)
         throws GridDhtInvalidPartitionException {
         if (req.size() == 1) {
             KeyCacheObject key = req.key(0);
@@ -2811,11 +2895,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+            int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+            List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
 
             while (true) {
-                for (int i = 0; i < req.size(); i++) {
-                    GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+                for (int i = 0; i < keysNum; i++) {
+                    int idx = stripeIdxs == null ? i : stripeIdxs[i];
+
+                    GridDhtCacheEntry entry = entryExx(req.key(idx), topVer);
 
                     locked.add(entry);
                 }
@@ -3003,25 +3090,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        int size
     ) {
-        if (updateReq.size() == 1)
+        if (size == 1)
             return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
         else
-            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+            return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, size);
     }
 
     /**
      * @param nodeId Sender node ID.
+     * @param stripeIdx Stripe number.
      * @param req Near atomic update request.
      */
-    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+    private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req, int stripeIdx) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
-                ", node=" + nodeId + ']');
+                ", node=" + nodeId + ", stripe=" + stripeIdx + ']');
         }
 
-        updateAllAsyncInternal(nodeId, req, updateReplyClos);
+        updateAllAsyncInternal(nodeId, req, stripeIdx, updateReplyClos);
     }
 
     /**
@@ -3031,7 +3120,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response " +
+                "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() + ']');
 
         res.nodeId(ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..af9fd8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,11 +49,12 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         GridCacheVersion writeVer,
-        GridNearAtomicAbstractUpdateRequest updateReq
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        int size
     ) {
         super(cctx, writeVer, updateReq);
 
-        mappings = U.newHashMap(updateReq.size());
+        mappings = U.newHashMap(size);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..770999a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -138,7 +139,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     /** Future ID. */
     @GridToStringInclude
-    protected Long futId;
+    protected long futId = -1;
 
     /** Operation result. */
     protected GridCacheReturn opRes;
@@ -356,7 +357,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return Response to notify about primary failure.
      */
     final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
-        assert req.response() == null : req;
         assert req.nodeId() != null : req;
 
         if (msgLog.isDebugEnabled()) {
@@ -423,6 +423,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         final GridNearAtomicAbstractUpdateRequest req;
 
         /** */
+        private GridNearAtomicUpdateResponse res;
+
+        /** */
+        private Map<Integer, GridNearAtomicUpdateResponse> resMap;
+
+        /** */
         @GridToStringInclude
         Set<UUID> dhtNodes;
 
@@ -431,7 +437,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> rcvd;
 
         /** */
-        private boolean hasRes;
+        private boolean hasDhtRes;
 
         /**
          * @param req Request.
@@ -516,13 +522,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
+         * @return {@code True} if all near results gathered.
+         */
+        private boolean hasAllNearResults() {
+            return res != null || (resMap != null && resMap.size() == req.stripes());
+        }
+
+        public String state() {
+            return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
+        }
+
+        /**
          * @return {@code True} if all expected responses are received.
          */
         private boolean finished() {
             if (req.writeSynchronizationMode() == PRIMARY_SYNC)
-                return hasRes;
+                return hasAllNearResults();
 
-            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
         }
 
         /**
@@ -536,13 +553,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
              * When primary failed, even if primary response is received, it is possible it failed to send
              * request to backup(s), need remap operation.
              */
-            if (req.fullSync() && !req.nodeFailedResponse()) {
-                req.resetResponse();
+            if (req.fullSync() && !nodeFailedResponse()) {
+                resetResponse();
 
                 return req;
             }
 
-            return req.response() == null ? req : null;
+            return hasAllNearResults() ? req : null;
         }
 
         /**
@@ -559,7 +576,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return null;
 
-            return req.response() == null ? req : null;
+            return !hasAllNearResults() ? req : null;
         }
 
         /**
@@ -571,7 +588,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return DhtLeftResult.NOT_DONE;
 
             if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
-                if (hasRes)
+                if (hasDhtRes)
                     return DhtLeftResult.DONE;
                 else
                     return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -592,7 +609,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return false;
 
             if (res.hasResult())
-                hasRes = true;
+                hasDhtRes = true;
 
             if (dhtNodes == null) {
                 if (rcvd == null)
@@ -614,9 +631,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
             assert !finished() : this;
 
-            hasRes = true;
-
-            boolean onRes = req.onResponse(res);
+            boolean onRes = storeResponse(res);
 
             assert onRes;
 
@@ -639,7 +654,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @param cctx Context.
          */
         private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
-            assert dhtNodes == null || req.initMappingLocally();
+            assert F.isEmpty(dhtNodes) || req.initMappingLocally();
 
             Set<UUID> dhtNodes0 = dhtNodes;
 
@@ -664,12 +679,67 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 dhtNodes = Collections.emptySet();
         }
 
+        /**
+         * @return {@code True} if received notification about primary fail.
+         */
+        boolean nodeFailedResponse() {
+            return res != null && res.nodeLeftResponse();
+        }
+
+        /**
+         * @param res Response.
+         * @return {@code True} if current response was {@code null}.
+         */
+        private boolean storeResponse(GridNearAtomicUpdateResponse res) {
+            if (res.stripe() > -1) {
+                if (resMap == null)
+                    resMap = U.newHashMap(req.stripes());
+
+                if (!resMap.containsKey(res.stripe())) {
+                    resMap.put(res.stripe(), res);
+                    return true;
+                }
+            }
+            else {
+                if (this.res == null) {
+                    this.res = res;
+//                    this.resMap = null;
+
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        /**
+         *
+         */
+        void resetResponse() {
+            res = null;
+            resMap = null;
+        }
+
+        /**
+         * @return Response.
+         */
+        @Nullable public GridNearAtomicUpdateResponse response() {
+            return res;
+        }
+
+        /**
+         * @return Response.
+         */
+        @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
+            return resMap;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
                 "primary", primaryId(),
                 "needPrimaryRes", req.needPrimaryResponse(),
-                "primaryRes", req.response() != null,
+                "primaryRes", this.res != null,
                 "done", finished());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..b549370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,10 +89,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridToStringExclude
     protected byte flags;
 
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
     /**
      *
      */
@@ -249,41 +245,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @param res Response.
-     * @return {@code True} if current response was {@code null}.
-     */
-    public boolean onResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null) {
-            this.res = res;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     *
-     */
-    void resetResponse() {
-        this.res = null;
-    }
-
-    /**
-     * @return Response.
-     */
-    @Nullable public GridNearAtomicUpdateResponse response() {
-        return res;
-    }
-
-    /**
-     * @return {@code True} if received notification about primary fail.
-     */
-    boolean nodeFailedResponse() {
-        return res != null && res.nodeLeftResponse();
-    }
-
-    /**
      * @return Topology locked flag.
      */
     final boolean topologyLocked() {
@@ -451,6 +412,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     public abstract int size();
 
     /**
+     * @return Number of stripes.
+     */
+    public int stripes() {
+        return 0;
+    }
+
+    /**
      * @param idx Key index.
      * @return Key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..5a8c66b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,13 +21,16 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -40,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,6 +63,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** */
     private static final long serialVersionUID = 0L;
 
+    public static final int DIRECT_TYPE = 40;
+
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -70,6 +74,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    /** Stripe to index mapping. */
+    @GridDirectTransient
+    private int[] stripeCnt;
+
+    /** Stripe to index mapping bytes. */
+    @GridDirectMap(keyType = int.class, valueType = int[].class)
+    private Map<Integer, int[]> stripeMap;
+
     /** Entry processors. */
     @GridDirectTransient
     private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -109,6 +121,17 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectTransient
     private int initSize;
 
+    /** Maximum number of keys. */
+    @GridDirectTransient
+    private int maxEntryCnt;
+
+    /** Number of stripes on remote node. */
+    @GridDirectTransient
+    private int maxStripes;
+
+    /** Partition Id */
+    private int partId;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -155,7 +178,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         boolean skipStore,
         boolean keepBinary,
         boolean addDepInfo,
-        int maxEntryCnt
+        int maxEntryCnt,
+        int maxStripes
     ) {
         super(cacheId,
             nodeId,
@@ -181,6 +205,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         // than 10, we use it.
         initSize = Math.min(maxEntryCnt, 10);
 
+        this.maxEntryCnt = maxEntryCnt;
+        this.maxStripes = maxStripes;
+
         keys = new ArrayList<>(initSize);
     }
 
@@ -200,6 +227,25 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
         assert val != null || op == DELETE;
 
+        if (maxStripes > 1) {
+            int stripe = key.partition() % maxStripes;
+
+            if (stripeCnt == null)
+                stripeCnt = new int[maxStripes];
+
+            if (stripeMap == null)
+                stripeMap = new HashMap<>(maxStripes);
+
+            int[] idxs = stripeMap.get(stripe);
+
+            if (idxs == null)
+                stripeMap.put(stripe, idxs = new int[maxEntryCnt]);
+
+            int idx = stripeCnt[stripe];
+            idxs[idx] = keys.size();
+            stripeCnt[stripe] = idx + 1;
+        }
+
         keys.add(key);
 
         if (entryProcessor != null) {
@@ -267,6 +313,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public int stripes() {
+        return stripeMap.size();
+    }
+
+    /** {@inheritDoc} */
     @Override public KeyCacheObject key(int idx) {
         return keys.get(idx);
     }
@@ -353,6 +404,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         return expiryPlc;
     }
 
+    /**
+     * @return Stripe mapping.
+     */
+    @Nullable public Map<Integer, int[]> stripeMap() {
+        return stripeMap;
+    }
+
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
@@ -362,6 +420,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         if (expiryPlc != null && expiryPlcBytes == null)
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
+        if (stripeMap != null && stripeCnt != null) {
+            for (Integer idx : stripeMap.keySet()) {
+                stripeMap.put(idx, Arrays.copyOf(stripeMap.get(idx), stripeCnt[idx]));
+            }
+
+            stripeCnt = null;
+        }
+
         prepareMarshalCacheObjects(keys, cctx);
 
         if (filter != null) {
@@ -425,9 +491,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        assert !F.isEmpty(keys);
+        return partId;
+    }
 
-        return keys.get(0).partition();
+    /**
+     * @param partId Partition.
+     */
+    public void partition(int partId) {
+        this.partId = partId;
     }
 
     /** {@inheritDoc} */
@@ -494,6 +565,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 writer.incrementState();
 
             case 18:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 19:
+                if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -580,6 +663,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 18:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
+                stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -594,24 +693,24 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public void cleanup(boolean clearKeys) {
-        vals = null;
-        entryProcessors = null;
-        entryProcessorsBytes = null;
-        invokeArgs = null;
-        invokeArgsBytes = null;
-
-        if (clearKeys)
-            keys = null;
+//            vals = null;
+//            entryProcessors = null;
+//            entryProcessorsBytes = null;
+//            invokeArgs = null;
+//            invokeArgsBytes = null;
+//
+//            if (clearKeys)
+//                keys = null;
     }
 
     /** {@inheritDoc} */
     @Override public byte directType() {
-        return 40;
+        return DIRECT_TYPE;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 19;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..2b6d2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         CachePartialUpdateCheckedException err0 = null;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Non-null topology version if update should be remapped.
      */
     private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert futId != -1;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -362,7 +362,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cctx.mvcc().removeAtomicFuture(futId);
 
             reqState = null;
-            futId = null;
+            futId = -1;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -488,7 +488,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == -1 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -530,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param futId
      * @return
      */
-    private boolean checkDhtNodes(Long futId) {
+    private boolean checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -538,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == -1 || this.futId != futId)
                 return false;
 
             assert reqState != null;
@@ -568,13 +568,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /**
      * @return Future ID.
      */
-    private Long onFutureDone() {
+    private long onFutureDone() {
         Long id0;
 
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = -1;
         }
 
         return id0;
@@ -694,6 +694,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 skipStore,
                 keepBinary,
                 cctx.deploymentEnabled(),
+                1,
                 1);
         }
 
@@ -721,9 +722,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         }
 
         if (nearEnabled) {
-            assert reqState.req.response() != null;
+            assert reqState.response() != null;
 
-            updateNear(reqState.req, reqState.req.response());
+            updateNear(reqState.req, reqState.response());
         }
 
         onDone(opRes, err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..9c4de66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -57,6 +57,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
@@ -150,7 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -167,7 +168,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
         synchronized (mux) {
-            if (futId == null)
+            if (futId == -1)
                 return false;
 
             if (singleReq != null) {
@@ -282,10 +283,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            Long futId = onFutureDone();
+            long futVer = onFutureDone();
 
-            if (futId != null)
-                cctx.mvcc().removeAtomicFuture(futId);
+            if (futVer > -1)
+                cctx.mvcc().removeAtomicFuture(futVer);
 
             return true;
         }
@@ -300,7 +301,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             PrimaryRequestState reqState;
@@ -370,10 +371,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        boolean rcvAll;
+        boolean rcvAll = false;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == -1 || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
@@ -408,6 +409,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
                         rcvAll = false;
                     }
+                    if (msgLog.isDebugEnabled())
+                        msgLog.debug("Processed near atomic update response " +
+                            "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
+                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
+                            " " + reqState.state() + ']');
                 }
                 else
                     return;
@@ -466,11 +472,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
+                    if (reqState.responses() != null) {
+                        for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
+                            updateNear(reqState.req, res0);
+                        }
+                    }
+                    else {
+                        GridNearAtomicUpdateResponse res0 = reqState.response();
 
-                    assert res0 != null : reqState;
+                        assert res0 != null : reqState;
 
-                    updateNear(reqState.req, res0);
+                        updateNear(reqState.req, res0);
+                    }
                 }
             }
             else if (!nodeErr)
@@ -534,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Non null topology version if update should be remapped.
      */
     @Nullable private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert futId != -1;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -577,7 +590,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (remapTopVer0 != null) {
             cctx.mvcc().removeAtomicFuture(futId);
 
-            futId = null;
+            futId = -1;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -596,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = reqState.req.response();
+                    GridNearAtomicUpdateResponse res0 = reqState.response();
 
                     assert res0 != null : reqState;
 
@@ -604,9 +617,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 }
             }
             else {
-                assert singleReq != null && singleReq.req.response() != null;
+                assert singleReq != null && singleReq.response() != null;
 
-                updateNear(singleReq.req, singleReq.req.response());
+                updateNear(singleReq.req, singleReq.response());
             }
         }
 
@@ -767,7 +780,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        Long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().atomicFutureId();
 
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
@@ -801,7 +814,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == -1 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -866,7 +879,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll = false;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == -1 || this.futId != futId)
                 return;
 
             if (singleReq != null) {
@@ -934,13 +947,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /**
      * @return Future version.
      */
-    private Long onFutureDone() {
-        Long id0;
+    private long onFutureDone() {
+        long id0;
 
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = -1;
         }
 
         return id0;
@@ -957,7 +970,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
-        Long futId,
+        long futId,
         @Nullable Collection<KeyCacheObject> remapKeys,
         boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
@@ -977,6 +990,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
+        int part = (int)(futId & 0xFFFF);
+
         // Create mappings first, then send messages.
         for (Object key : keys) {
             if (key == null)
@@ -1045,6 +1060,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
             if (mapped == null) {
+                int stripes = primary.attribute(ATTR_STRIPES_CNT) != null ? (int)primary.attribute(ATTR_STRIPES_CNT) : -1;
+
                 GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
                     cctx.cacheId(),
                     nodeId,
@@ -1063,7 +1080,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     skipStore,
                     keepBinary,
                     cctx.deploymentEnabled(),
-                    keys.size());
+                    keys.size(),
+                    stripes
+                );
+
+                req.partition(part);
 
                 mapped = new PrimaryRequestState(req, nodes, false);
 
@@ -1086,7 +1107,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId, boolean mappingKnown)
         throws Exception {
         Object key = F.first(keys);
 
@@ -1168,6 +1189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             skipStore,
             keepBinary,
             cctx.deploymentEnabled(),
+            1,
             1);
 
         req.addUpdateEntry(cacheKey,

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 4e20fc7..f8b45d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -96,6 +96,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Partition ID. */
     private int partId = -1;
 
+    /** Stripe. */
+    private int stripe = -1;
+
     /** */
     @GridDirectCollection(UUID.class)
     @GridToStringInclude
@@ -182,6 +185,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
+     * @param partId Partition ID for proper striping on near node.
+     */
+    public void partition(int partId) {
+        this.partId = partId;
+    }
+
+    /**
      * Sets update error.
      *
      * @param err Error.
@@ -427,6 +437,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
         return partId;
     }
 
+    /**
+     * @return Stripe number.
+     */
+    public int stripe() {
+        return stripe;
+    }
+
+    /**
+     * @param stripe Stripe number.
+     */
+    public void stripe(int stripe) {
+        this.stripe = stripe;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return addDepInfo;
@@ -524,6 +548,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 writer.incrementState();
 
+            case 15:
+                if (!writer.writeInt("stripe", stripe))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -636,6 +666,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
+            case 15:
+                stripe = reader.readInt("stripe");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -648,7 +686,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..5844021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -130,7 +131,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res
     ) {
-        if (F.size(res.failedKeys()) == req.size())
+        int keyNum;
+        int[] stripeIdxs;
+
+        if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
+            stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+            keyNum = stripeIdxs.length;
+        }
+        else {
+            stripeIdxs = null;
+            keyNum = req.keys().size();
+        }
+
+        if (F.size(res.failedKeys()) == keyNum)
             return;
 
         /*
@@ -150,11 +163,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        for (int i = 0; i < req.size(); i++) {
-            if (F.contains(skipped, i))
+        for (int i = 0; i < keyNum; i++) {
+            int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+            if (F.contains(skipped, trueIdx))
                 continue;
 
-            KeyCacheObject key = req.key(i);
+            KeyCacheObject key = req.key(trueIdx);
 
             if (F.contains(failed, key))
                 continue;
@@ -170,7 +185,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
             CacheObject val = null;
 
-            if (F.contains(nearValsIdxs, i)) {
+            if (F.contains(nearValsIdxs, trueIdx)) {
                 val = res.nearValue(nearValIdx);
 
                 nearValIdx++;
@@ -179,7 +194,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 assert req.operation() != TRANSFORM;
 
                 if (req.operation() != DELETE)
-                    val = req.value(i);
+                    val = req.value(trueIdx);
             }
 
             long ttl = res.nearTtl(i);


[07/10] ignite git commit: tmp

Posted by sb...@apache.org.
tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f518395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f518395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f518395

Branch: refs/heads/ignite-4680-sb
Commit: 5f51839525a839c1eec9c28aa7772cc9f1bc59c1
Parents: 3e7ee08
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 15:54:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 15:54:56 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 33 ++++++++++++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 15 ++++++---
 .../atomic/GridNearAtomicFullUpdateRequest.java |  1 +
 .../apache/ignite/internal/util/MPSCQueue.java  |  8 ++---
 4 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 39c514b..6dad30b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,11 +55,13 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.MPSCQueue;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -201,6 +203,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
     };
 
+    private Thread resThread;
+
+    private MPSCQueue<Runnable> q;
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -221,6 +227,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         synchronized (sysLsnrsMux) {
             sysLsnrs = new GridMessageListener[GridTopic.values().length];
         }
+
+        resThread = new Thread() {
+            public void run() {
+                while (true) {
+                    try {
+                        Runnable r = q.take();
+
+                        r.run();
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+
+        q = new MPSCQueue<>(resThread);
+
+        resThread.setDaemon(true);
+        resThread.start();
     }
 
     /**
@@ -823,6 +849,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
+//        if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+//            q.add(c);
+//
+//            return;
+//        }
+
+
         if (plc == GridIoPolicy.SYSTEM_POOL &&
             (msg.partition() != Integer.MIN_VALUE ||
                 msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 973256f..dcc79d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1761,6 +1761,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         completionCb.apply(req, res);
     }
 
+    private GridCacheVersion ver;
+
+
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1837,13 +1840,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
                         // Assign next version for update inside entries lock.
-                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+                        if (ver == null)
+                            ver = ctx.versions().next(top.topologyVersion());
 
                         if (hasNear)
                             res.nearVersion(ver);
@@ -1859,7 +1863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
 
-                        dhtFut = createDhtFuture(ver, req, size);
+                        dhtFut = null;//createDhtFuture(ver, req, size);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1977,9 +1981,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
-        else
+        else {
             if (dhtFut != null)
                 dhtFut.map(node, res.returnValue(), res, completionCb);
+            else
+                completionCb.apply(req, res);
+        }
 
         if (req.writeSynchronizationMode() != FULL_ASYNC)
             req.cleanup(!node.isLocal());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 2e619ee..ce6035e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -406,6 +406,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override @Nullable public Map<Integer, int[]> stripeMap() {
+        //stripeMap = null;
         return stripeMap;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
index 5505b3a..5725390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -41,7 +41,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     /** */
     final AtomicReference<Node> putStack = new AtomicReference<>();
     /** */
-    private final AtomicInteger takeStackSize = new AtomicInteger();
+    //private final AtomicInteger takeStackSize = new AtomicInteger();
 
     /** */
     private Thread consumerThread;
@@ -189,7 +189,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     private void dequeue() {
         takeStack[takeStackIndex] = null;
         takeStackIndex++;
-        takeStackSize.lazySet(takeStackSize.get() - 1);
+        //takeStackSize.lazySet(takeStackSize.get() - 1);
     }
 
     /** */
@@ -248,7 +248,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     private void copyIntoTakeStack(Node putStackHead) {
         int putStackSize = putStackHead.size;
 
-        takeStackSize.lazySet(putStackSize);
+        //takeStackSize.lazySet(putStackSize);
 
         if (putStackSize > takeStack.length)
             takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
@@ -270,7 +270,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     @Override public int size() {
         Node h = putStack.get();
         int putStackSize = h == null ? 0 : h.size;
-        return putStackSize + takeStackSize.get();
+        return putStackSize + 0;//takeStackSize.get();
     }
 
     /** {@inheritDoc}. */


[10/10] ignite git commit: tmp

Posted by sb...@apache.org.
tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e59edc93
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e59edc93
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e59edc93

Branch: refs/heads/ignite-4680-sb
Commit: e59edc93098eb63457953cf92b5ec4bad789c0ca
Parents: ae9737b
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 18:00:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 18:00:24 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 12 ++------
 .../atomic/GridNearAtomicFullUpdateRequest.java | 32 +++-----------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  4 ---
 3 files changed, 6 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bcfea79..f2a251d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -212,9 +212,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         updateReplyClos = new UpdateReplyClosure() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
-                if (req.writeSynchronizationMode() != FULL_ASYNC) {
+                if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
-                }
                 else {
                     if (res.remapTopologyVersion() != null)
                         // Remap keys on primary node in FULL_ASYNC mode.
@@ -280,17 +279,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     UUID nodeId,
                     GridNearAtomicAbstractUpdateRequest req
                 ) {
-                    int stripeIdx;
-                    Thread curTrd = Thread.currentThread();
-                    if (req instanceof GridNearAtomicFullUpdateRequest && curTrd instanceof IgniteThread)
-                        stripeIdx = ((IgniteThread)curTrd).stripe();
-                    else
-                        stripeIdx = IgniteThread.GRP_IDX_UNASSIGNED;
-
                     processNearAtomicUpdateRequest(
                         nodeId,
                         req,
-                        stripeIdx);
+                        -1);
                 }
 
                 @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index ce6035e..2b461ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -129,9 +129,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectTransient
     private int maxStripes;
 
-    /** Partition Id */
-    private int partId;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -490,14 +487,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public int partition() {
-        return partId;
-    }
-
-    /**
-     * @param partId Partition.
-     */
-    public void partition(int partId) {
-        this.partId = partId;
+        return keys.get(0).partition();
     }
 
     /** {@inheritDoc} */
@@ -564,18 +554,12 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeInt("partId", partId))
-                    return false;
-
-                writer.incrementState();
-
-            case 19:
                 if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 20:
+            case 19:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -662,14 +646,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 18:
-                partId = reader.readInt("partId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 19:
                 stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
@@ -677,7 +653,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
                 reader.incrementState();
 
-            case 20:
+            case 19:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -709,7 +685,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 21;
+        return 20;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59edc93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8774d5f..2685ef6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -982,8 +982,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
 
-        int part = (int)(futId & 0xFFFF);
-
         // Create mappings first, then send messages.
         for (Object key : keys) {
             if (key == null)
@@ -1076,8 +1074,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     stripes
                 );
 
-                req.partition(part);
-
                 mapped = new PrimaryRequestState(req, nodes, false);
 
                 pendingMappings.put(nodeId, mapped);