You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/05 04:19:54 UTC

[01/29] incubator-ignite git commit: # ignite-970

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-389-ipc 7ee51ba05 -> 6b51f99e7


# ignite-970


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/104a13fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/104a13fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/104a13fd

Branch: refs/heads/ignite-389-ipc
Commit: 104a13fd2118804e42b5035df4340d7374c36e82
Parents: 1dbdd42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 14:25:08 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 14:25:08 2015 +0300

----------------------------------------------------------------------
 .../util/nio/GridShmemCommunicationClient.java  | 151 +++++++
 .../communication/tcp/TcpCommunicationSpi.java  | 414 ++++++++++++++++++-
 .../tcp/TcpCommunicationSpiMBean.java           |   8 +
 .../IgniteCacheMessageRecoveryAbstractTest.java |   1 +
 .../spi/GridTcpSpiForwardingSelfTest.java       |   1 +
 .../GridTcpCommunicationSpiAbstractTest.java    |  13 +
 ...mmunicationSpiConcurrentConnectSelfTest.java |   4 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  11 +-
 ...pCommunicationSpiMultithreadedShmemTest.java |  28 ++
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   1 +
 .../GridTcpCommunicationSpiShmemSelfTest.java   |  38 ++
 .../tcp/GridTcpCommunicationSpiTcpSelfTest.java |   7 +
 .../IgniteSpiCommunicationSelfTestSuite.java    |   2 +
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  13 +
 ...oopSecondaryFileSystemConfigurationTest.java |  14 +
 .../hadoop/HadoopAbstractSelfTest.java          |   6 +
 17 files changed, 695 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
new file mode 100644
index 0000000..f3dc46f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridShmemCommunicationClient extends GridAbstractCommunicationClient {
+    /** */
+    private final IpcSharedMemoryClientEndpoint shmem;
+
+    /** */
+    private final ByteBuffer writeBuf;
+
+    /** */
+    private final MessageFormatter formatter;
+
+    /**
+     * @param metricsLsnr Metrics listener.
+     * @param port Shared memory IPC server port.
+     * @param connTimeout Connection timeout.
+     * @param log Logger.
+     * @param formatter Message formatter.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+        int port,
+        long connTimeout,
+        IgniteLogger log,
+        MessageFormatter formatter)
+        throws IgniteCheckedException
+    {
+        super(metricsLsnr);
+
+        assert metricsLsnr != null;
+        assert port > 0 && port < 0xffff;
+        assert connTimeout >= 0;
+
+        shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
+
+        writeBuf = ByteBuffer.allocate(8 << 10);
+
+        writeBuf.order(ByteOrder.nativeOrder());
+
+        this.formatter = formatter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
+        throws IgniteCheckedException {
+        handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean close() {
+        boolean res = super.close();
+
+        if (res)
+            shmem.close();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forceClose() {
+        super.forceClose();
+
+        // Do not call forceClose() here.
+        shmem.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+        if (closed())
+            throw new IgniteCheckedException("Communication client was closed: " + this);
+
+        try {
+            shmem.outputStream().write(data, 0, len);
+
+            metricsLsnr.onBytesSent(len);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+        }
+
+        markUsed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg)
+        throws IgniteCheckedException {
+        if (closed())
+            throw new IgniteCheckedException("Communication client was closed: " + this);
+
+        assert writeBuf.hasArray();
+
+        try {
+            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+
+            metricsLsnr.onBytesSent(cnt);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+        }
+
+        markUsed();
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flushIfNeeded(long timeout) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridShmemCommunicationClient.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 19e54c8..3768db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -25,15 +25,19 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -139,6 +143,10 @@ import static org.apache.ignite.events.EventType.*;
 @IgniteSpiConsistencyChecked(optional = false)
 public class TcpCommunicationSpi extends IgniteSpiAdapter
     implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+    /** IPC error message. */
+    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
+        "(switching to TCP, may be slower).";
+
     /** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
     public static final String ATTR_ADDRS = "comm.tcp.addrs";
 
@@ -148,12 +156,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
     public static final String ATTR_PORT = "comm.tcp.port";
 
+    /** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
+    public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
+
     /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
 
     /** Default port which node sets listener to (value is <tt>47100</tt>). */
     public static final int DFLT_PORT = 47100;
 
+    /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
+    public static final int DFLT_SHMEM_PORT = 48100;
+
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
 
@@ -293,7 +307,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     assert ses.accepted();
 
                     if (msg instanceof NodeIdMessage)
-                        sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+                        sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
                     else {
                         assert msg instanceof HandshakeMessage : msg;
 
@@ -322,6 +336,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     GridCommunicationClient oldClient = clients.get(sndId);
 
+                    boolean hasShmemClient = false;
+
                     if (oldClient != null) {
                         if (oldClient instanceof GridTcpNioCommunicationClient) {
                             if (log.isDebugEnabled())
@@ -333,6 +349,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             return;
                         }
+                        else {
+                            assert oldClient instanceof GridShmemCommunicationClient;
+
+                            hasShmemClient = true;
+                        }
                     }
 
                     GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
@@ -359,10 +380,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                                 return;
                             }
+                            else {
+                                assert oldClient instanceof GridShmemCommunicationClient;
+
+                                hasShmemClient = true;
+                            }
                         }
 
                         boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
 
                         if (log.isDebugEnabled())
                             log.debug("Received incoming connection from remote node " +
@@ -371,7 +397,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (reserved) {
                             try {
                                 GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
                                 fut.onDone(client);
                             }
@@ -393,11 +419,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                         else {
                             boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
 
                             if (reserved) {
                                 GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
                                 fut.onDone(client);
                             }
@@ -465,6 +491,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
              * @param node Node.
              * @param rcvCnt Number of received messages..
              * @param sndRes If {@code true} sends response for recovery handshake.
+             * @param createClient If {@code true} creates NIO communication client.
              * @return Client.
              */
             private GridTcpNioCommunicationClient connected(
@@ -472,7 +499,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridNioSession ses,
                 ClusterNode node,
                 long rcvCnt,
-                boolean sndRes) {
+                boolean sndRes,
+                boolean createClient) {
                 recovery.onHandshake(rcvCnt);
 
                 ses.recoveryDescriptor(recovery);
@@ -484,12 +512,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 recovery.connected();
 
-                GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log);
+                GridTcpNioCommunicationClient client = null;
+
+                if (createClient) {
+                    client = new GridTcpNioCommunicationClient(ses, log);
 
-                GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+                    GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
 
-                assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
+                    assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
                         ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+                }
 
                 return client;
             }
@@ -517,22 +549,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final GridFutureAdapter<GridCommunicationClient> fut;
 
+                /** */
+                private final boolean createClient;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
                  * @param rmtNode Remote node.
                  * @param msg Handshake message.
+                 * @param createClient If {@code true} creates NIO communication client..
                  * @param fut Connect future.
                  */
                 ConnectClosure(GridNioSession ses,
                     GridNioRecoveryDescriptor recoveryDesc,
                     ClusterNode rmtNode,
                     HandshakeMessage msg,
+                    boolean createClient,
                     GridFutureAdapter<GridCommunicationClient> fut) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
                     this.msg = msg;
+                    this.createClient = createClient;
                     this.fut = fut;
                 }
 
@@ -545,7 +583,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     msgFut.get();
 
                                     GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, msg.received(), false);
+                                        connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
 
                                     fut.onDone(client);
                                 }
@@ -594,6 +632,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Local port range. */
     private int locPortRange = DFLT_PORT_RANGE;
 
+    /** Local port which node uses to accept shared memory connections. */
+    private int shmemPort = DFLT_SHMEM_PORT;
+
     /** Allocate direct buffer or heap buffer. */
     private boolean directBuf = true;
 
@@ -635,6 +676,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** NIO server. */
     private GridNioServer<Message> nioSrvr;
 
+    /** Shared memory server. */
+    private IpcSharedMemoryServerEndpoint shmemSrv;
+
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
@@ -647,6 +691,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
+    /** Shared memory accept worker. */
+    private ShmemAcceptWorker shmemAcceptWorker;
+
     /** Idle client worker. */
     private IdleClientWorker idleClientWorker;
 
@@ -659,6 +706,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Recovery worker. */
     private RecoveryWorker recoveryWorker;
 
+    /** Shared memory workers. */
+    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
+
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
 
@@ -668,6 +718,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Bound port. */
     private int boundTcpPort = -1;
 
+    /** Bound port for shared memory server. */
+    private int boundTcpShmemPort = -1;
+
     /** Count of selectors to use in TCP server. */
     private int selectorsCnt = DFLT_SELECTORS_CNT;
 
@@ -811,6 +864,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Sets local port to accept shared memory connections.
+     * <p>
+     * If set to {@code -1} shared memory communication will be disabled.
+     * <p>
+     * If not provided, default value is {@link #DFLT_SHMEM_PORT}.
+     *
+     * @param shmemPort Port number.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setSharedMemoryPort(int shmemPort) {
+        this.shmemPort = shmemPort;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getSharedMemoryPort() {
+        return shmemPort;
+    }
+
+    /**
      * Sets maximum idle connection timeout upon which a connection
      * to client will be closed.
      * <p>
@@ -1179,6 +1251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
+        assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
         assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
         assertParameter(minBufferedMsgCnt >= 0, "minBufferedMsgCnt >= 0");
@@ -1204,6 +1277,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         try {
+            shmemSrv = resetShmemServer();
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to start shared memory communication server.", e);
+        }
+
+        try {
             // This method potentially resets local port to the value
             // local node was bound to.
             nioSrvr = resetNioServer();
@@ -1223,6 +1303,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 createSpiAttributeName(ATTR_ADDRS), addrs.get1(),
                 createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
                 createSpiAttributeName(ATTR_PORT), boundTcpPort,
+                createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null,
                 createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
         }
         catch (IOException | IgniteCheckedException e) {
@@ -1251,6 +1332,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("tcpNoDelay", tcpNoDelay));
             log.debug(configInfo("sockSndBuf", sockSndBuf));
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
+            log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
             log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt));
             log.debug(configInfo("connTimeout", connTimeout));
@@ -1272,6 +1354,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
 
+        if (shmemSrv != null) {
+            shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
+
+            new IgniteThread(shmemAcceptWorker).start();
+        }
+
         nioSrvr.start();
 
         idleClientWorker = new IdleClientWorker();
@@ -1301,6 +1389,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
         spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);
 
+        // SPI can start without shmem port.
+        if (boundTcpShmemPort > 0)
+            spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
+
         spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         ctxInitLatch.countDown();
@@ -1341,7 +1433,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         // If configured TCP port is busy, find first available in range.
         for (int port = locPort; port < locPort + locPortRange; port++) {
             try {
-                MessageFactory messageFactory = new MessageFactory() {
+                MessageFactory msgFactory = new MessageFactory() {
                     private MessageFactory impl;
 
                     @Nullable @Override public Message create(byte type) {
@@ -1354,7 +1446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter messageFormatter = new MessageFormatter() {
+                MessageFormatter msgFormatter = new MessageFormatter() {
                     private MessageFormatter impl;
 
                     @Override public MessageWriter writer() {
@@ -1376,7 +1468,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter);
+                GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
 
                 IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
                     @Override public boolean apply(Message msg) {
@@ -1403,7 +1495,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .writeTimeout(sockWriteTimeout)
                         .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
-                        .messageFormatter(messageFormatter)
+                        .messageFormatter(msgFormatter)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .build();
 
@@ -1435,6 +1527,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
     }
 
+    /**
+     * Creates new shared memory communication server.
+     * @return Server.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
+        if (boundTcpShmemPort >= 0)
+            throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
+
+        if (shmemPort == -1 || U.isWindows())
+            return null;
+
+        IgniteCheckedException lastEx = null;
+
+        // If configured TCP port is busy, find first available in range.
+        for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
+            try {
+                IpcSharedMemoryServerEndpoint srv =
+                    new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName);
+
+                srv.setPort(port);
+
+                srv.omitOutOfResourcesWarning(true);
+
+                srv.start();
+
+                boundTcpShmemPort = port;
+
+                // Ack Port the TCP server was bound to.
+                if (log.isInfoEnabled())
+                    log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
+                        ", locHost=" + locHost + ']');
+
+                return srv;
+            }
+            catch (IgniteCheckedException e) {
+                lastEx = e;
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
+                        ", locHost=" + locHost + ']');
+            }
+        }
+
+        // If free port wasn't found.
+        throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
+            locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
         assert isNodeStopping();
@@ -1445,6 +1586,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
+        U.cancel(shmemAcceptWorker);
+        U.join(shmemAcceptWorker, log);
+
         U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
         U.interrupt(sockTimeoutWorker);
@@ -1455,6 +1599,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         U.join(sockTimeoutWorker, log);
         U.join(recoveryWorker, log);
 
+        U.cancel(shmemWorkers);
+        U.join(shmemWorkers, log);
+
+        shmemWorkers.clear();
+
         // Force closing on stop (safety).
         for (GridCommunicationClient client : clients.values())
             client.forceClose();
@@ -1665,13 +1814,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
         assert node != null;
 
-        if (getSpiContext().localNode() == null)
+        Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
+
+        ClusterNode locNode = getSpiContext().localNode();
+
+        if (locNode == null)
             throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
 
+        // If remote node has shared memory server enabled and has the same set of MACs
+        // then we are likely to run on the same host and shared memory communication could be tried.
+        if (shmemPort != null && U.sameMacs(locNode, node)) {
+            try {
+                return createShmemClient(node, shmemPort);
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(IpcOutOfSystemResourcesException.class))
+                    // Has cause or is itself the IpcOutOfSystemResourcesException.
+                    LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
+                else if (getSpiContext().node(node.id()) != null)
+                    LT.warn(log, null, e.getMessage());
+                else if (log.isDebugEnabled())
+                    log.debug("Failed to establish shared memory connection with local node (node has left): " +
+                        node.id());
+            }
+        }
+
         return createTcpClient(node);
     }
 
     /**
+     * @param node Node.
+     * @param port Port.
+     * @return Client.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
+        int attempt = 1;
+
+        int connectAttempts = 1;
+
+        long connTimeout0 = connTimeout;
+
+        while (true) {
+            GridCommunicationClient client;
+
+            try {
+                client = new GridShmemCommunicationClient(metricsLsnr,
+                    port,
+                    connTimeout,
+                    log,
+                    getSpiContext().messageFormatter());
+            }
+            catch (IgniteCheckedException e) {
+                // Reconnect for the second time, if connection is not established.
+                if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
+                    connectAttempts++;
+
+                    continue;
+                }
+
+                throw e;
+            }
+
+            try {
+                safeHandshake(client, null, node.id(), connTimeout0);
+            }
+            catch (HandshakeTimeoutException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                        ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                client.forceClose();
+
+                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+                            ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                    throw e;
+                }
+                else {
+                    attempt++;
+
+                    connTimeout0 *= 2;
+
+                    continue;
+                }
+            }
+            catch (IgniteCheckedException | RuntimeException | Error e) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
+
+                client.forceClose();
+
+                throw e;
+            }
+
+            return client;
+        }
+    }
+
+    /**
      * Establish TCP connection to remote node and returns client.
      *
      * @param node Remote node.
@@ -2154,6 +2400,144 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * This worker takes responsibility to shut the server down when stopping,
+     * No other thread shall stop passed server.
+     */
+    private class ShmemAcceptWorker extends GridWorker {
+        /** */
+        private final IpcSharedMemoryServerEndpoint srv;
+
+        /**
+         * @param srv Server.
+         */
+        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
+            super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
+
+            this.srv = srv;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                while (!Thread.interrupted()) {
+                    ShmemWorker e = new ShmemWorker(srv.accept());
+
+                    shmemWorkers.add(e);
+
+                    new IgniteThread(e).start();
+                }
+            }
+            catch (IgniteCheckedException e) {
+                if (!isCancelled())
+                    U.error(log, "Shmem server failed.", e);
+            }
+            finally {
+                srv.close();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            srv.close();
+        }
+    }
+
+    /**
+     *
+     */
+    private class ShmemWorker extends GridWorker {
+        /** */
+        private final IpcEndpoint endpoint;
+
+        /**
+         * @param endpoint Endpoint.
+         */
+        private ShmemWorker(IpcEndpoint endpoint) {
+            super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
+
+            this.endpoint = endpoint;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                MessageFactory msgFactory = new MessageFactory() {
+                    private MessageFactory impl;
+
+                    @Nullable @Override public Message create(byte type) {
+                        if (impl == null)
+                            impl = getSpiContext().messageFactory();
+
+                        assert impl != null;
+
+                        return impl.create(type);
+                    }
+                };
+
+                MessageFormatter msgFormatter = new MessageFormatter() {
+                    private MessageFormatter impl;
+
+                    @Override public MessageWriter writer() {
+                        if (impl == null)
+                            impl = getSpiContext().messageFormatter();
+
+                        assert impl != null;
+
+                        return impl.writer();
+                    }
+
+                    @Override public MessageReader reader(MessageFactory factory) {
+                        if (impl == null)
+                            impl = getSpiContext().messageFormatter();
+
+                        assert impl != null;
+
+                        return impl.reader(factory);
+                    }
+                };
+
+                IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
+                    metricsLsnr,
+                    log,
+                    endpoint,
+                    srvLsnr,
+                    msgFormatter,
+                    new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
+                    new GridConnectionBytesVerifyFilter(log)
+                );
+
+                adapter.serve();
+            }
+            finally {
+                shmemWorkers.remove(this);
+
+                endpoint.close();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            endpoint.close();
+        }
+
+        /** @{@inheritDoc} */
+        @Override protected void cleanup() {
+            super.cleanup();
+
+            endpoint.close();
+        }
+
+        /** @{@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ShmemWorker.class, this);
+        }
+    }
+
+    /**
      *
      */
     private class IdleClientWorker extends IgniteSpiThread {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..3c6b64e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -44,6 +44,14 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
     public int getLocalPort();
 
     /**
+     * Gets local port for shared memory communication.
+     *
+     * @return Port number.
+     */
+    @MXBeanDescription("Shared memory endpoint port number.")
+    public int getSharedMemoryPort();
+
+    /**
      * Gets maximum number of local ports tried if all previously
      * tried ports are occupied.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 96abe5f..8031315 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -50,6 +50,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
 
         commSpi.setSocketWriteTimeout(1000);
+        commSpi.setSharedMemoryPort(-1);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index ed9e0cf..744635d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -115,6 +115,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         commSpi.setLocalAddress("127.0.0.1");
         commSpi.setLocalPort(commLocPort);
         commSpi.setLocalPortRange(1);
+        commSpi.setSharedMemoryPort(-1);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index ea51aff..2d3f506 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -37,10 +37,23 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     /** */
     public static final int IDLE_CONN_TIMEOUT = 2000;
 
+    /** */
+    private final boolean useShmem;
+
+    /**
+     * @param useShmem Use shared mem flag.
+     */
+    protected GridTcpCommunicationSpiAbstractTest(boolean useShmem) {
+        this.useShmem = useShmem;
+    }
+
     /** {@inheritDoc} */
     @Override protected CommunicationSpi getSpi(int idx) {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
+        if (!useShmem)
+            spi.setSharedMemoryPort(-1);
+
         spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
         spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
         spi.setTcpNoDelay(tcpNoDelay());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c038ee7..26e1120 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -181,8 +181,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
         if (load) {
             loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() {
-                @Override
-                public Long call() throws Exception {
+                @Override public Long call() throws Exception {
                     long dummyRes = 0;
 
                     List<String> list = new ArrayList<>();
@@ -300,6 +299,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
         spi.setLocalPort(port++);
         spi.setIdleConnectionTimeout(60_000);
         spi.setConnectTimeout(10_000);
+        spi.setSharedMemoryPort(-1);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index e7ae957..9909d76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -55,6 +55,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /** Message id sequence. */
     private AtomicLong msgId = new AtomicLong();
 
+    /** */
+    private final boolean useShmem;
+
     /** SPI resources. */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
@@ -80,9 +83,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     }
 
     /**
+     * @param useShmem Use shared mem.
      */
-    public GridTcpCommunicationSpiMultithreadedSelfTest() {
+    protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
         super(false);
+
+        this.useShmem = useShmem;
     }
 
     /**
@@ -413,6 +419,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     private CommunicationSpi<Message> newCommunicationSpi() {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
+        if (!useShmem)
+            spi.setSharedMemoryPort(-1);
+
         spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
         spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
new file mode 100644
index 0000000..590b426
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest {
+    /** */
+    public GridTcpCommunicationSpiMultithreadedShmemTest() {
+        super(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index c0f0b11..1a4ba22 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -324,6 +324,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
         spi.setTcpNoDelay(true);
         spi.setAckSendThreshold(ackCnt);
         spi.setMessageQueueLimit(queueLimit);
+        spi.setSharedMemoryPort(-1);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 7463388..5d3afd9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -608,6 +608,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     protected TcpCommunicationSpi getSpi(int idx) {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
+        spi.setSharedMemoryPort(-1);
         spi.setLocalPort(port++);
         spi.setIdleConnectionTimeout(10_000);
         spi.setConnectTimeout(10_000);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
new file mode 100644
index 0000000..5746a3c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.testframework.junits.spi.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiShmemSelfTest extends GridTcpCommunicationSpiAbstractTest {
+    /**
+     *
+     */
+    public GridTcpCommunicationSpiShmemSelfTest() {
+        super(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean tcpNoDelay() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
index 32bced2..c27a86f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
@@ -24,6 +24,13 @@ import org.apache.ignite.testframework.junits.spi.*;
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
 public class GridTcpCommunicationSpiTcpSelfTest extends GridTcpCommunicationSpiAbstractTest {
+    /**
+     *
+     */
+    public GridTcpCommunicationSpiTcpSelfTest() {
+        super(false);
+    }
+
     /** {@inheritDoc} */
     @Override protected boolean tcpNoDelay() {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1d3bfcd..ff86bda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -38,10 +38,12 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpNoDelayOffSelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiShmemSelfTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiStartStopSelfTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index 9bcd5de..a1535ed 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -186,6 +188,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
             cfg.setFileSystemConfiguration(igfsCfg);
             cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
             cfg.setLocalHost(U.getLocalHost().getHostAddress());
+            cfg.setCommunicationSpi(communicationSpi());
 
             G.start(cfg);
         }
@@ -211,6 +214,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
         cfg.setFileSystemConfiguration(igfsConfiguration(gridName));
         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
         cfg.setLocalHost("127.0.0.1");
+        cfg.setCommunicationSpi(communicationSpi());
 
         return cfg;
     }
@@ -270,6 +274,15 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
         return cfg;
     }
 
+    /** @return Communication SPI. */
+    private CommunicationSpi communicationSpi() {
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        return commSpi;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         G.stopAll(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index b089995..8c33679 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -279,6 +281,8 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
         cfg.setFileSystemConfiguration(igfsCfg);
         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
 
+        cfg.setCommunicationSpi(communicationSpi());
+
         G.start(cfg);
     }
 
@@ -314,6 +318,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
         cfg.setCacheConfiguration(cacheConfiguration());
         cfg.setFileSystemConfiguration(fsConfiguration(gridName));
         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+        cfg.setCommunicationSpi(communicationSpi());
 
         return cfg;
     }
@@ -371,6 +376,15 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
         return cfg;
     }
 
+    /** @return Communication SPI. */
+    private CommunicationSpi communicationSpi() {
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        return commSpi;
+    }
+
     /**
      * Case #SecondaryFileSystemProvider(null, path)
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..7fda532 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -94,6 +94,12 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
 
         cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
 
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
         TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
 
         discoSpi.setIpFinder(IP_FINDER);


[07/29] incubator-ignite git commit: ignite-989 Fix multicast for client discovery

Posted by ag...@apache.org.
ignite-989 Fix multicast for client discovery


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9ad3617d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9ad3617d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9ad3617d

Branch: refs/heads/ignite-389-ipc
Commit: 9ad3617d39f02eb98ef01f332e2aac20f97870c2
Parents: 97d0bc1
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 3 12:06:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 12:11:26 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  31 -----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  56 +++++++-
 .../tcp/ipfinder/TcpDiscoveryIpFinder.java      |  10 +-
 .../TcpDiscoveryMulticastIpFinder.java          |  47 +++++--
 .../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  |  43 ++++---
 8 files changed, 244 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e672d64..6a5b644 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -166,6 +166,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         msgWorker = new MessageWorker();
         msgWorker.start();
 
+        if (spi.ipFinder.isShared())
+            registerLocalNodeAddress();
+
         try {
             joinLatch.await();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 57c13d6..698835f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -272,37 +272,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         spi.printStartInfo();
     }
 
-    /**
-     * @throws IgniteSpiException If failed.
-     */
-    @SuppressWarnings("BusyWait")
-    private void registerLocalNodeAddress() throws IgniteSpiException {
-        // Make sure address registration succeeded.
-        while (true) {
-            try {
-                spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
-
-                // Success.
-                break;
-            }
-            catch (IllegalStateException e) {
-                throw new IgniteSpiException("Failed to register local node address with IP finder: " +
-                    locNode.socketAddresses(), e);
-            }
-            catch (IgniteSpiException e) {
-                LT.error(log, e, "Failed to register local node address in IP finder on start " +
-                    "(retrying every 2000 ms).");
-            }
-
-            try {
-                U.sleep(2000);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw new IgniteSpiException("Thread has been interrupted.", e);
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
         spiCtx.registerPort(tcpSrvr.port, TCP);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f285279..b7e9e53 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
@@ -58,7 +60,7 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
-     *
+     * @return Local node ID.
      */
     public UUID getLocalNodeId() {
         return spi.getLocalNodeId();
@@ -78,42 +80,47 @@ abstract class TcpDiscoveryImpl {
     public abstract void dumpDebugInfo(IgniteLogger log);
 
     /**
-     *
+     * @return SPI state string.
      */
     public abstract String getSpiState();
 
     /**
-     *
+     * @return Message worker queue current size.
      */
     public abstract int getMessageWorkerQueueSize();
 
     /**
-     *
+     * @return Coordinator ID.
      */
     public abstract UUID getCoordinator();
 
     /**
-     *
+     * @return Collection of remote nodes.
      */
     public abstract Collection<ClusterNode> getRemoteNodes();
 
     /**
      * @param nodeId Node id.
+     * @return Node with given ID or {@code null} if node is not found.
      */
     @Nullable public abstract ClusterNode getNode(UUID nodeId);
 
     /**
      * @param nodeId Node id.
+     * @return {@code true} if node alive, {@code false} otherwise.
      */
     public abstract boolean pingNode(UUID nodeId);
 
     /**
+     * Tells discovery SPI to disconnect from topology.
      *
+     * @throws IgniteSpiException If failed.
      */
     public abstract void disconnect() throws IgniteSpiException;
 
     /**
      * @param msg Message.
+     * @throws IgniteException If failed.
      */
     public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
 
@@ -124,16 +131,18 @@ abstract class TcpDiscoveryImpl {
 
     /**
      * @param gridName Grid name.
+     * @throws IgniteSpiException If failed.
      */
     public abstract void spiStart(@Nullable String gridName) throws IgniteSpiException;
 
     /**
-     *
+     * @throws IgniteSpiException If failed.
      */
     public abstract void spiStop() throws IgniteSpiException;
 
     /**
      * @param spiCtx Spi context.
+     * @throws IgniteSpiException If failed.
      */
     public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;
 
@@ -164,7 +173,40 @@ abstract class TcpDiscoveryImpl {
     public abstract void brakeConnection();
 
     /**
-     * FOR TEST PURPOSE ONLY!
+     * <strong>FOR TEST ONLY!!!</strong>
+     *
+     * @return Worker thread.
      */
     protected abstract IgniteSpiThread workerThread();
+
+    /**
+     * @throws IgniteSpiException If failed.
+     */
+    @SuppressWarnings("BusyWait")
+    protected final void registerLocalNodeAddress() throws IgniteSpiException {
+        // Make sure address registration succeeded.
+        while (true) {
+            try {
+                spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+                // Success.
+                break;
+            }
+            catch (IllegalStateException e) {
+                throw new IgniteSpiException("Failed to register local node address with IP finder: " +
+                    locNode.socketAddresses(), e);
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to register local node address in IP finder on start " +
+                    "(retrying every 2000 ms).");
+            }
+
+            try {
+                U.sleep(2000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
index 51ad7b4..95758e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
@@ -31,7 +31,7 @@ public interface TcpDiscoveryIpFinder {
      * method is completed, SPI context can be stored for future access.
      *
      * @param spiCtx Spi context.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException;
 
@@ -46,7 +46,7 @@ public interface TcpDiscoveryIpFinder {
      * Initializes addresses discovery SPI binds to.
      *
      * @param addrs Addresses discovery SPI binds to.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 
@@ -54,7 +54,7 @@ public interface TcpDiscoveryIpFinder {
      * Gets all addresses registered in this finder.
      *
      * @return All known addresses, potentially empty, but never {@code null}.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException;
 
@@ -76,7 +76,7 @@ public interface TcpDiscoveryIpFinder {
      * is already registered.
      *
      * @param addrs Addresses to register. Not {@code null} and not empty.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 
@@ -87,7 +87,7 @@ public interface TcpDiscoveryIpFinder {
      * registered quietly (just no-op).
      *
      * @param addrs Addresses to unregister. Not {@code null} and not empty.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 45d0816..a992620 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -26,6 +26,8 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.jetbrains.annotations.*;
 
@@ -254,6 +256,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
                 "(it is recommended in production to specify at least one address in " +
                 "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
 
+        boolean clientMode;
+
+        if (ignite != null) { // Can be null if used in tests without starting Ignite.
+            DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
+
+            if (!(discoSpi instanceof TcpDiscoverySpi))
+                throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
+                    "TcpDiscoverySpi: " + discoSpi);
+
+            clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
+        }
+        else
+            clientMode = false;
+
         InetAddress mcastAddr;
 
         try {
@@ -296,7 +312,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
             if (!addr.isLoopbackAddress()) {
                 try {
-                    addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
+                    if (!clientMode)
+                        addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
 
                     reqItfs.add(addr);
                 }
@@ -309,20 +326,24 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
             }
         }
 
-        if (addrSnds.isEmpty()) {
-            try {
-                // Create non-bound socket if local host is loopback or failed to create sockets explicitly
-                // bound to interfaces.
-                addrSnds.add(new AddressSender(mcastAddr, null, addrs));
-            }
-            catch (IOException e) {
-                throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
-                    ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+        if (!clientMode) {
+            if (addrSnds.isEmpty()) {
+                try {
+                    // Create non-bound socket if local host is loopback or failed to create sockets explicitly
+                    // bound to interfaces.
+                    addrSnds.add(new AddressSender(mcastAddr, null, addrs));
+                }
+                catch (IOException e) {
+                    throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
+                        ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+                }
             }
-        }
 
-        for (AddressSender addrSnd :addrSnds)
-            addrSnd.start();
+            for (AddressSender addrSnd : addrSnds)
+                addrSnd.start();
+        }
+        else
+            assert addrSnds.isEmpty() : addrSnds;
 
         Collection<InetSocketAddress> ret;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
new file mode 100644
index 0000000..d1b6232
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
+    /** */
+    private boolean forceSrv;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setLocalHost("127.0.0.1");
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(new TcpDiscoveryMulticastIpFinder());
+
+        if (getTestGridName(1).equals(gridName)) {
+            cfg.setClientMode(true);
+
+            spi.setForceServerMode(forceSrv);
+        }
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinWithMulticast() throws Exception {
+        joinWithMulticast();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinWithMulticastForceServer() throws Exception {
+        forceSrv = true;
+
+        joinWithMulticast();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void joinWithMulticast() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        assertSpi(ignite0, false);
+
+        Ignite ignite1 = startGrid(1);
+
+        assertSpi(ignite1, !forceSrv);
+
+        assertTrue(ignite1.configuration().isClientMode());
+
+        assertEquals(2, ignite0.cluster().nodes().size());
+        assertEquals(2, ignite1.cluster().nodes().size());
+
+        Ignite ignite2 = startGrid(2);
+
+        assertSpi(ignite2, false);
+
+        assertEquals(3, ignite0.cluster().nodes().size());
+        assertEquals(3, ignite1.cluster().nodes().size());
+        assertEquals(3, ignite2.cluster().nodes().size());
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param client Expected client mode flag.
+     */
+    private void assertSpi(Ignite ignite, boolean client) {
+        DiscoverySpi spi = ignite.configuration().getDiscoverySpi();
+
+        assertSame(TcpDiscoverySpi.class, spi.getClass());
+
+        TcpDiscoverySpi spi0 = (TcpDiscoverySpi)spi;
+
+        assertSame(TcpDiscoveryMulticastIpFinder.class, spi0.getIpFinder().getClass());
+
+        assertEquals(client, spi0.isClientMode());
+
+        Collection<Object> addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds");
+
+        assertNotNull(addrSnds);
+
+        if (client)
+            assertTrue(addrSnds.isEmpty()); // Check client does not send its address.
+        else
+            assertFalse(addrSnds.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index dc35b24..ea5a7ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
+        suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
index b496f60..48991e8 100644
--- a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
+++ b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
@@ -68,34 +68,37 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
  * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
  */
 public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapter {
-    /* Default object's content. */
+    /** Default object's content. */
     private final static ByteArrayInputStream OBJECT_CONTENT =  new ByteArrayInputStream(new byte[0]);
 
     /** Grid logger. */
     @LoggerResource
     private IgniteLogger log;
 
-    /* Google Cloud Platform's project name.*/
+    /** Google Cloud Platform's project name.*/
     private String projectName;
 
-    /* Google Storage bucket name. */
+    /** Google Storage bucket name. */
     private String bucketName;
 
-    /* Service account p12 private key file name. */
-    private String serviceAccountP12FilePath;
+    /** Service account p12 private key file name. */
+    private String srvcAccountP12FilePath;
 
-    /* Service account id. */
-    private String serviceAccountId;
+    /** Service account id. */
+    private String srvcAccountId;
 
-    /* Google storage. */
+    /** Google storage. */
     private Storage storage;
 
-    /* Init routine guard. */
+    /** Init routine guard. */
     private final AtomicBoolean initGuard = new AtomicBoolean();
 
-    /* Init routine latch. */
+    /** Init routine latch. */
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
+    /**
+     *
+     */
     public TcpDiscoveryGoogleStorageIpFinder() {
         setShared(true);
     }
@@ -221,7 +224,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     @IgniteSpiConfiguration(optional = false)
     public void setServiceAccountP12FilePath(String p12FileName) {
-        this.serviceAccountP12FilePath = p12FileName;
+        this.srvcAccountP12FilePath = p12FileName;
     }
 
     /**
@@ -235,7 +238,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     @IgniteSpiConfiguration(optional = false)
     public void setServiceAccountId(String id) {
-        this.serviceAccountId = id;
+        this.srvcAccountId = id;
     }
 
     /**
@@ -245,13 +248,13 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     private void init() throws IgniteSpiException {
         if (initGuard.compareAndSet(false, true)) {
-            if (serviceAccountId == null ||
-                serviceAccountP12FilePath == null ||
+            if (srvcAccountId == null ||
+                srvcAccountP12FilePath == null ||
                 projectName == null ||
                 bucketName == null) {
                 throw new IgniteSpiException(
                     "One or more of the required parameters is not set [serviceAccountId=" +
-                        serviceAccountId + ", serviceAccountP12FilePath=" + serviceAccountP12FilePath + ", projectName=" +
+                        srvcAccountId + ", serviceAccountP12FilePath=" + srvcAccountP12FilePath + ", projectName=" +
                         projectName + ", bucketName=" + bucketName + "]");
             }
 
@@ -265,12 +268,12 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
                     throw new IgniteSpiException(e);
                 }
 
-                GoogleCredential credential;
+                GoogleCredential cred;
 
                 try {
-                    credential = new GoogleCredential.Builder().setTransport(httpTransport)
-                        .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId)
-                        .setServiceAccountPrivateKeyFromP12File(new File(serviceAccountP12FilePath))
+                    cred = new GoogleCredential.Builder().setTransport(httpTransport)
+                        .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(srvcAccountId)
+                        .setServiceAccountPrivateKeyFromP12File(new File(srvcAccountP12FilePath))
                         .setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL)).build();
 
                 }
@@ -279,7 +282,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
                 }
 
                 try {
-                    storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), credential)
+                    storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), cred)
                         .setApplicationName(projectName).build();
                 }
                 catch (Exception e) {


[06/29] incubator-ignite git commit: IGNITE-983: Added tests.

Posted by ag...@apache.org.
IGNITE-983: Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bafad996
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bafad996
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bafad996

Branch: refs/heads/ignite-389-ipc
Commit: bafad99677ff63431adfc9ca9c3e3a3897447c25
Parents: 11c0b90
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 15:25:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 15:25:49 2015 +0700

----------------------------------------------------------------------
 ...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++++++++++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 2 files changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
new file mode 100644
index 0000000..967a466
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimitiveTypes() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("c1");
+
+        ccfg.setIndexedTypes(
+            byte.class, byte.class,
+            short.class, short.class,
+            int.class, int.class,
+            long.class, long.class,
+            float.class, float.class,
+            double.class, double.class,
+            boolean.class, boolean.class);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg);
+
+        byte b = 1;
+        cache.put(b, b);
+
+        short s = 2;
+        cache.put(s, s);
+
+        int i = 3;
+        cache.put(i, i);
+
+        long l = 4;
+        cache.put(l, l);
+
+        float f = 5;
+        cache.put(f, f);
+
+        double d = 6;
+        cache.put(d, d);
+
+        boolean bool = true;
+        cache.put(bool, bool);
+
+        assert cache.query(new ScanQuery<>()).getAll().size() == 7;
+
+        assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
+        assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 240caff..67ebda9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -51,6 +51,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
 
         suite.addTestSuite(CacheConfigurationP2PTest.class);
 
+        suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+
         return suite;
     }
 }


[17/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5

Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae5189ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae5189ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae5189ab

Branch: refs/heads/ignite-389-ipc
Commit: ae5189ab9513120258add9769e8cfc88e1c6aad8
Parents: 1b12bb4 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:42:36 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:42:36 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  8 ++-
 .../GridDhtPartitionsExchangeFuture.java        | 10 +++-
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 62 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[22/29] incubator-ignite git commit: # IGNITE-983. Minor fix after review.

Posted by ag...@apache.org.
# IGNITE-983. Minor fix after review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bf3203a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bf3203a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bf3203a4

Branch: refs/heads/ignite-389-ipc
Commit: bf3203a42fbd92e5960c29f672351d20cd756897
Parents: 46b2447
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:50:16 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:50:16 2015 +0700

----------------------------------------------------------------------
 ...gniteCacheConfigurationPrimitiveTypesSelfTest.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf3203a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
index 967a466..e90f10c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -93,12 +93,12 @@ public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAb
 
         assert cache.query(new ScanQuery<>()).getAll().size() == 7;
 
-        assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
-        assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+        assertEquals(cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size(), 1);
+        assertEquals(cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size(), 1);
     }
 }


[10/29] incubator-ignite git commit: Merge branch 'ignite-970' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389-ipc

Posted by ag...@apache.org.
Merge branch 'ignite-970' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389-ipc


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a329e901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a329e901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a329e901

Branch: refs/heads/ignite-389-ipc
Commit: a329e901d47419d5ab5e1db55dee6d2001f9d66e
Parents: 7ee51ba 7158fb6
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 15:22:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 15:22:36 2015 -0700

----------------------------------------------------------------------
 modules/core/pom.xml                            |   1 -
 .../util/nio/GridShmemCommunicationClient.java  | 151 +++++++
 .../communication/tcp/TcpCommunicationSpi.java  | 414 ++++++++++++++++++-
 .../tcp/TcpCommunicationSpiMBean.java           |   8 +
 .../IgniteCacheMessageRecoveryAbstractTest.java |   1 +
 .../communication/GridIoManagerBenchmark0.java  |   1 +
 .../spi/GridTcpSpiForwardingSelfTest.java       |   1 +
 .../GridTcpCommunicationSpiAbstractTest.java    |  13 +
 ...mmunicationSpiConcurrentConnectSelfTest.java |   4 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  21 +-
 ...pCommunicationSpiMultithreadedShmemTest.java |  28 ++
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   1 +
 .../GridTcpCommunicationSpiShmemSelfTest.java   |  38 ++
 .../tcp/GridTcpCommunicationSpiTcpSelfTest.java |   7 +
 .../IgniteSpiCommunicationSelfTestSuite.java    |   2 +
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  13 +
 ...oopSecondaryFileSystemConfigurationTest.java |  14 +
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |   7 +
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   7 +
 .../hadoop/HadoopAbstractSelfTest.java          |   6 +
 21 files changed, 718 insertions(+), 21 deletions(-)
----------------------------------------------------------------------



[20/29] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5

Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/922c1c44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/922c1c44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/922c1c44

Branch: refs/heads/ignite-389-ipc
Commit: 922c1c445b2519dbea1c7cb68145c05b2c063d41
Parents: e625709 7501025
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:14:20 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:14:20 2015 +0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/internal/IgniteKernal.java    | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------



[08/29] incubator-ignite git commit: ignite-981 Do not access cache in exchange future before cache is ready

Posted by ag...@apache.org.
ignite-981 Do not access cache in exchange future before cache is ready


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1603fe50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1603fe50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1603fe50

Branch: refs/heads/ignite-389-ipc
Commit: 1603fe502b03d5f3e57e7837e14f0d33af002236
Parents: 97d0bc1
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 3 13:21:32 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 13:34:01 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  8 ++-
 .../GridDhtPartitionsExchangeFuture.java        | 10 +++-
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 62 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1ae4ae7..68652c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -740,7 +740,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']';
+                ", cache=" + cctx.name() +
+                ", started=" + cctx.started() +
+                ", stopping=" + stopping +
+                ", locNodeId=" + cctx.localNode().id() +
+                ", locName=" + cctx.gridName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -758,6 +762,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
+        assert partMap != null;
+
         lock.writeLock().lock();
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db43c6c..e0bfee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -902,8 +902,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             id.topologyVersion());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal())
-                m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            if (!cacheCtx.isLocal()) {
+                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+
+                if (ready)
+                    m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            }
         }
 
         // It is important that client topologies be added after contexts.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 095221e..db9e6a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -68,6 +69,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
     private boolean testAttribute = true;
 
     /** */
+    private boolean client;
+
+    /** */
     private boolean daemon;
 
     /**
@@ -85,6 +89,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        if (client) {
+            cfg.setClientMode(true);
+
+            ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        }
+
         cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute));
 
         CacheConfiguration cacheCfg = new CacheConfiguration();
@@ -1024,4 +1034,56 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             stopGrid(nodeCount());
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopWithClientJoin() throws Exception {
+        Ignite ignite1 = ignite(1);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                client = true;
+
+                int iter = 0;
+
+                while (!stop.get()) {
+                    if (iter % 10 == 0)
+                        log.info("Client start/stop iteration: " + iter);
+
+                    iter++;
+
+                    try (Ignite ignite = startGrid(nodeCount())) {
+                        assertTrue(ignite.configuration().isClientMode());
+                    }
+                }
+
+                return null;
+            }
+        }, 1, "client-start-stop");
+
+        try {
+            long stopTime = U.currentTimeMillis() + 30_000;
+
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (iter % 10 == 0)
+                    log.info("Cache start/stop iteration: " + iter);
+
+                try (IgniteCache<Object, Object> cache = ignite1.getOrCreateCache("cache-" + iter)) {
+                    assertNotNull(cache);
+                }
+
+                iter++;
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        fut.get();
+    }
 }


[05/29] incubator-ignite git commit: IGNITE-983: Added support for primitive types.

Posted by ag...@apache.org.
IGNITE-983: Added support for primitive types.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/11c0b904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/11c0b904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/11c0b904

Branch: refs/heads/ignite-389-ipc
Commit: 11c0b904a934c31ac936a8793cd11bf34af7634b
Parents: 97d0bc1
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 14:26:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 14:26:49 2015 +0700

----------------------------------------------------------------------
 .../org/apache/ignite/configuration/CacheConfiguration.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11c0b904/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 2c7d8c1..8b1e1a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,7 +1664,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
             "Number of indexed types is expected to be even. Refer to method javadoc for details.");
 
-        this.indexedTypes = indexedTypes;
+        int len = indexedTypes.length;
+
+        Class<?>[] newIndexedTypes = new Class<?>[len];
+
+        for (int i = 0; i < len; i++)
+            newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+        this.indexedTypes = newIndexedTypes;
 
         return this;
     }


[15/29] incubator-ignite git commit: # ignite-991 minor

Posted by ag...@apache.org.
# ignite-991 minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5ee09f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5ee09f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5ee09f0

Branch: refs/heads/ignite-389-ipc
Commit: b5ee09f0db550dc6a29c03e07473465d4ac767ab
Parents: 38c084a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:34:19 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:34:19 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtPartitionTopologyImpl.java     | 4 +++-
 .../dht/preloader/GridDhtPartitionsExchangeFuture.java          | 1 +
 .../cache/IgniteDynamicCacheWithConfigStartSelfTest.java        | 5 ++---
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index af121c3..2656990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -668,7 +668,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
+                ", allIds=" + allIds +
+                ", node2part=" + node2part +
+                ", cache=" + cctx.name() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a03e2e8..fdaded1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -293,6 +293,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param cacheId Cache ID to check.
+     * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
      */
     public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
index dcd6a69..6386f8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -48,7 +48,7 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
         cfg.setDiscoverySpi(discoSpi);
 
         if (client)
-            cfg.setCacheConfiguration(cacheConfiguration(gridName));
+            cfg.setCacheConfiguration(cacheConfiguration());
 
         cfg.setClientMode(client);
 
@@ -56,10 +56,9 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
     }
 
     /**
-     * @param cacheName Cache name.
      * @return Cache configuration.
      */
-    protected CacheConfiguration cacheConfiguration(String cacheName) {
+    private CacheConfiguration cacheConfiguration() {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
 
         ccfg.setIndexedTypes(String.class, String.class);


[09/29] incubator-ignite git commit: # IGNITE-983 Fixed logic to support null value.

Posted by ag...@apache.org.
# IGNITE-983 Fixed logic to support null value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51d4737a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51d4737a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51d4737a

Branch: refs/heads/ignite-389-ipc
Commit: 51d4737ab70a2962ec8ef3f300317e1223d52ab3
Parents: bafad99
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 00:17:27 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 00:17:27 2015 +0700

----------------------------------------------------------------------
 .../ignite/configuration/CacheConfiguration.java      | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51d4737a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 8b1e1a5..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,14 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
             "Number of indexed types is expected to be even. Refer to method javadoc for details.");
 
-        int len = indexedTypes.length;
+        if (indexedTypes != null) {
+            int len = indexedTypes.length;
 
-        Class<?>[] newIndexedTypes = new Class<?>[len];
+            Class<?>[] newIndexedTypes = new Class<?>[len];
 
-        for (int i = 0; i < len; i++)
-            newIndexedTypes[i] = U.box(indexedTypes[i]);
+            for (int i = 0; i < len; i++)
+                newIndexedTypes[i] = U.box(indexedTypes[i]);
 
-        this.indexedTypes = newIndexedTypes;
+            this.indexedTypes = newIndexedTypes;
+        }
+        else
+            this.indexedTypes = null;
 
         return this;
     }


[02/29] incubator-ignite git commit: # ignite-970

Posted by ag...@apache.org.
# ignite-970


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/39ce1cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/39ce1cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/39ce1cbf

Branch: refs/heads/ignite-389-ipc
Commit: 39ce1cbfe190a709c6a2711e42160727fb01ce02
Parents: 104a13f
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 14:37:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 14:37:42 2015 +0300

----------------------------------------------------------------------
 .../loadtests/communication/GridIoManagerBenchmark0.java      | 1 +
 .../ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java  | 7 +++++++
 .../ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java   | 7 +++++++
 3 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 422d608..ea5b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -455,6 +455,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
         spi.setTcpNoDelay(true);
         spi.setConnectionBufferSize(0);
+        spi.setSharedMemoryPort(-1);
 
         info("Comm SPI: " + spi);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
index 7cea968..a89e586 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.v2.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -197,6 +198,12 @@ public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractT
 
         cfg.setDiscoverySpi(discoSpi);
 
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
         CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
 
         metaCacheCfg.setName("replicated");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index 2c17ba9..6773366 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -86,6 +87,12 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
 
         cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
 
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
         cnt++;
 
         return cfg;


[12/29] incubator-ignite git commit: #sprint-5 - Removed dumpStack

Posted by ag...@apache.org.
#sprint-5 - Removed dumpStack


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bd3abbc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bd3abbc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bd3abbc2

Branch: refs/heads/ignite-389-ipc
Commit: bd3abbc277d1dcfea7bad21ec1748ed5672ef22e
Parents: 97d0bc1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 18:54:31 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 18:54:31 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/internal/processors/cache/GridCacheContext.java | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd3abbc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index b20e59d..46db7c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1469,9 +1469,6 @@ public class GridCacheContext<K, V> implements Externalizable {
             Collection<ClusterNode> dhtNodeIds = new ArrayList<>(dhtRemoteNodes);
             Collection<ClusterNode> nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes);
 
-            if (!F.isEmpty(nearNodeIds))
-                U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds);
-
             entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
         }
 


[25/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-991' into ignite-sprint-5

Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-991' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a03d111f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a03d111f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a03d111f

Branch: refs/heads/ignite-389-ipc
Commit: a03d111f26fd16faf0629bc11fe2ef201dee92f3
Parents: bf3203a b5ee09f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 14:14:29 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 14:14:29 2015 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  8 +-
 .../GridDhtPartitionsExchangeFuture.java        | 19 +++-
 ...niteDynamicCacheWithConfigStartSelfTest.java | 97 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |  1 +
 5 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------


[24/29] incubator-ignite git commit: # ignite-981

Posted by ag...@apache.org.
# ignite-981


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6ea325c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6ea325c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6ea325c

Branch: refs/heads/ignite-389-ipc
Commit: a6ea325cf751cd9197eb80230cedc76a6a30daa4
Parents: ddcb2a3
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 13:41:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 13:41:59 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsClientCacheSelfTest.java   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6ea325c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
index 02166c4..9cda1b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
@@ -85,13 +85,16 @@ public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest {
         cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME),
             cacheConfiguration(CACHE_NAME));
 
-        if (!gridName.equals(getTestGridName(0)))
-            cfg.setClientMode(true);
-
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
 
         disco.setIpFinder(IP_FINDER);
 
+        if (!gridName.equals(getTestGridName(0))) {
+            cfg.setClientMode(true);
+
+            disco.setForceServerMode(true);
+        }
+
         cfg.setDiscoverySpi(disco);
 
         FileSystemConfiguration igfsCfg = new FileSystemConfiguration();


[27/29] incubator-ignite git commit: [IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator

Posted by ag...@apache.org.
[IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f72917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f72917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f72917

Branch: refs/heads/ignite-389-ipc
Commit: c9f72917843092d596044197cf7cb05c56a13fca
Parents: 20e5677
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 4 18:20:24 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 4 18:20:24 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopTaskContext.java    |  14 +-
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   2 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  14 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  70 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   2 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
 .../hadoop/SecondaryFileSystemProvider.java     |   4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  20 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  31 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  26 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  48 +++-
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../hadoop/HadoopAbstractSelfTest.java          |  14 +-
 .../hadoop/HadoopCommandLineTest.java           |  14 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 176 +++++++++++-
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../collections/HadoopAbstractMapTest.java      |  12 +
 22 files changed, 643 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Task context.
  */
 public abstract class HadoopTaskContext {
     /** */
-    private final HadoopJob job;
+    protected final HadoopJob job;
 
     /** */
     private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException If failed.
      */
     public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+    /**
+     * Executes a callable on behalf of the job owner.
+     * In case of embedded task execution the implementation of this method
+     * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+     * @param c The callable.
+     * @param <T> The return type of the Callable.
+     * @return The result of the callable.
+     * @throws IgniteCheckedException On any error in callable.
+     */
+    public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index b8095b8..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -121,6 +121,6 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteException {
-        igfs.stop(true);
+        // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..d910507 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
 import java.io.*;
@@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
 
     /** */
-    private static final String DEFAULT_USER_NAME = "anonymous";
-
-    /** */
     public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
 
     /** */
@@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
         throws IgniteCheckedException {
 
-        Configuration hadoopCfg = new Configuration();
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
         for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
             hadoopCfg.set(e.getKey(), e.getValue());
 
         String user = jobInfo.user();
 
-        if (F.isEmpty(user))
-            user = DEFAULT_USER_NAME;
+        user = IgfsUtils.fixUserName(user);
 
         String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
 
@@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
 
         try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
@@ -144,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Custom-provided sequential reads before prefetch. */
     private int seqReadsBeforePrefetch;
 
-    /** The cache was disabled when the instance was creating. */
-    private boolean cacheEnabled;
-
     /** {@inheritDoc} */
     @Override public URI getUri() {
         if (uri == null)
@@ -173,27 +169,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
-     * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+     * Gets non-null user name as per the Hadoop file system viewpoint.
      * @return the user name, never null.
      */
-    public static String getFsHadoopUser(Configuration cfg) throws IOException {
-        String user = null;
-
-        // -------------------------------------------
-        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
-        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
-        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
-        // ugi.doAs() closure.
-        if (cfg != null)
-            user = cfg.get(MRJobConfig.USER_NAME);
-        // -------------------------------------------
-
-        if (user == null) {
-            UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
-
-            if (currUgi != null)
-                user = currUgi.getShortUserName();
-        }
+    public static String getFsHadoopUser() throws IOException {
+        UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+        String user = currUgi.getShortUserName();
 
         user = IgfsUtils.fixUserName(user);
 
@@ -228,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             setConf(cfg);
 
-            String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
-            cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
             mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
 
             if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -242,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            user = getFsHadoopUser(cfg);
+            user = getFsHadoopUser();
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -360,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override protected void finalize() throws Throwable {
         super.finalize();
 
-        close0();
+        close();
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        if (cacheEnabled && get(getUri(), getConf()) == this)
-            return;
-
-        close0();
+        if (closeGuard.compareAndSet(false, true))
+            close0();
     }
 
     /**
@@ -377,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @throws IOException If failed.
      */
     private void close0() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (LOG.isDebugEnabled())
-                LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+        if (LOG.isDebugEnabled())
+            LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
 
-            if (rmtClient == null)
-                return;
+        if (rmtClient == null)
+            return;
 
-            super.close();
+        super.close();
 
-            rmtClient.close(false);
+        rmtClient.close(false);
 
-            if (clientLog.isLogEnabled())
-                clientLog.close();
+        if (clientLog.isLogEnabled())
+            clientLog.close();
 
-            if (secondaryFs != null)
-                U.closeQuiet(secondaryFs);
+        if (secondaryFs != null)
+            U.closeQuiet(secondaryFs);
 
-            // Reset initialized resources.
-            uri = null;
-            rmtClient = null;
-        }
+        // Reset initialized resources.
+        uri = null;
+        rmtClient = null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index f3fbe9c..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         uri = name;
 
-        user = getFsHadoopUser(cfg);
+        user = getFsHadoopUser();
 
         try {
             initialize(name, cfg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
 
                         jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..68a9ef6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
@@ -57,6 +63,41 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
+
+                    String scheme = uri.getScheme();
+
+                    // Copy the configuration to avoid altering the external object.
+                    Configuration cfg = new Configuration(key.configuration());
+
+                    String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+                    cfg.setBoolean(prop, true);
+
+                    return FileSystem.get(uri, cfg, key.user());
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+
     /**
      * Wraps native split.
      *
@@ -126,8 +167,6 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
-                // See https://issues.apache.org/jira/browse/IGNITE-764
                 setupProgress = 1;
                 mapProgress = 1;
 
@@ -304,9 +343,242 @@ public class HadoopUtils {
     }
 
     /**
-     * Constructor.
+     * Creates {@link Configuration} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link Configuration}.
      */
-    private HadoopUtils() {
-        // No-op.
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+        try {
+            return new Configuration();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Creates {@link JobConf} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link JobConf}.
+     */
+    public static JobConf safeCreateJobConf() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+        try {
+            return new JobConf();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        if (doCacheFs) {
+            try {
+                fs = getWithCaching(uri, cfg, usr);
+            }
+            catch (IgniteException ie) {
+                throw new IOException(ie);
+            }
+        }
+        else {
+            try {
+                fs = FileSystem.get(uri, cfg, usr);
+            }
+            catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+
+                throw new IOException(ie);
+            }
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the given key file system is
+     * initialized only once with the Configuration passed in upon the file system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+        FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        return fileSysLazyMap.getOrCreate(key);
+    }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    public static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
  */
 public class SecondaryFileSystemProvider {
     /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = new Configuration();
+    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
 
     /** The secondary filesystem URI, never null. */
     private final URI uri;
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
 
     /** {@inheritDoc} */
     @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
         HadoopPerformanceCounter perfCntr = null;
 
         try {
-            ctx = job.getTaskContext(info);
-
             perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
 
             perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
             if (ctx != null)
                 ctx.cleanupTaskEnvironment();
         }
-
-        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.hadoop.v2;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
         new ConcurrentHashMap8<>();
 
     /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
     private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
-        // Before create JobConf instance we should set new context class loader.
-        Thread.currentThread().setContextClassLoader(clsLdr);
-
-        jobConf = new JobConf();
+        jobConf = HadoopUtils.safeCreateJobConf();
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+            try {
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
                 JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
         if (old != null)
             return old.get();
 
-        Class<?> cls = taskCtxClsPool.poll();
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
 
         try {
             if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
 
-                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+                cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
                 fullCtxClsQueue.add(cls);
             }
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public void cleanupStagingDirectory() {
-        if (rsrcMgr != null)
-            rsrcMgr.cleanupStagingDirectory();
+        rsrcMgr.cleanupStagingDirectory();
+    }
+
+    /**
+     * Getter for job configuration.
+     * @return The job configuration.
+     */
+    public JobConf jobConf() {
+        return jobConf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
  * files are needed to be placed on local files system.
  */
 public class HadoopV2JobResourceManager {
+    /** File type Fs disable caching property name. */
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file");
+
     /** Hadoop job context. */
     private final JobContextImpl ctx;
 
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
 
-            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
                 FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
         }
         finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
 
                     if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
-                            stagingDir);
+                        throw new IgniteCheckedException("Failed to find map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
 
                     if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                        throw new IgniteCheckedException("Failed to copy job submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
                 }
             }
             else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+                throw new IgniteCheckedException("Failed to create local job directory: "
+                    + jobLocDir.getAbsolutePath());
 
             setLocalFSWorkingDirectory(jobLocDir);
         }
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = srcPath.getFileSystem(cfg);
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
 
                 if (!archivesPath.exists() && !archivesPath.mkdir())
                     throw new IOException("Failed to create directory " +
-                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
 
                 File archiveFile = new File(archivesPath, locName);
 
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null)
-                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index dd18c66..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.security.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -419,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
             FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());
@@ -448,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             throw new IgniteCheckedException(e);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.hadoop.mapreduce.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * @return Configuration.
      */
     private Configuration config(int port) {
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
 
         setupFileSystems(conf);
 
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
             ctx.getCounter(TestCounter.COUNTER2).increment(1);
 
             int sum = 0;
-            for (IntWritable value : values) {
+            for (IntWritable value : values)
                 sum += value.get();
-            }
 
             ctx.write(key, new IntWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..e8a0a6f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /** Initial REST port. */
     private int restPort = REST_PORT;
 
+    /** Secondary file system REST endpoint configuration. */
+    protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+    static {
+        SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+        SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+        SECONDARY_REST_CFG.setPort(11500);
+    }
+
+
     /** Initial classpath. */
     private static String initCp;
 
@@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /**
      * @return IGFS configuration.
      */
-    public FileSystemConfiguration igfsConfiguration() {
+    public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = new FileSystemConfiguration();
 
         cfg.setName(igfsName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index d10ee5c..c66cdf3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import com.google.common.base.*;
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jsr166.*;
 
@@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+        String cfgPath = "config/hadoop/default-config.xml";
+
+        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+        IgniteConfiguration cfg = tup.get1();
+
+        cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+        igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..a1ef7ba 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
 public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    /** The user to run Hadoop job on behalf of. */
+    protected static final String USER = "vasya";
+
+    /** Secondary IGFS name. */
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** The secondary Ignite node. */
+    protected Ignite igniteSecondary;
+
+    /** The secondary Fs. */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
     }
 
     /**
+     * Gets owner of a IgfsEx path.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsEx.PROP_USER_NAME);
+    }
+
+    /**
+     * Gets owner of a secondary Fs path.
+     * @param secFs The sec Fs.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     * Checks owner of the path.
+     * @param p The path.
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
+    /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
      */
@@ -71,7 +144,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             JobConf jobConf = new JobConf();
 
             jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser("yyy");
+            jobConf.setUser(USER);
             jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
 
             //To split into about 40 items for v2
@@ -105,14 +178,20 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             checkJobStatistics(jobId);
 
+            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+            checkOwner(new IgfsPath(outFile));
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
-                useNewReducer,
+                    useNewReducer,
                 "blue\t200000\n" +
-                "green\t150000\n" +
-                "red\t100000\n" +
-                "yellow\t70000\n",
-                readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
-            );
+                    "green\t150000\n" +
+                    "red\t100000\n" +
+                    "yellow\t70000\n",
+                readAndSortFile(outFile)
+                );
         }
     }
 
@@ -182,7 +261,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             }
         }
 
-        final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
 
         assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -212,4 +291,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
                 ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
         }
     }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+        super.beforeTest();
+    }
+
+    /**
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
+     */
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return G.start(cfg);
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
+
+        return fsCfg;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
 
     /** {@inheritDoc} */
-    @Override public FileSystemConfiguration igfsConfiguration() {
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = super.igfsConfiguration();
 
         cfg.setFragmentizerEnabled(false);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
 
     /**
      * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
                 igfs.info(inFile).length() - fileBlock1.length());
 
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
 
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
         int taskNum, String... words) throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
 
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testReduceTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
 
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testCombinerTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob("/", "/");
+        HadoopJob gridJob = getHadoopJob("/", "/");
 
         HadoopTestTaskContext ctx =
             runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context of combine task with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
         throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
 
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
 
-        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
 
         setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.util.*;
 
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws Exception if fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+        HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+        HadoopJob job = info.createJob(id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..9395c5e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Abstract class for maps test.
@@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
             assert false;
         }
 
+        /** {@inheritDoc} */
         @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
             assert false;
         }
+
+        /** {@inheritDoc} */
+        @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+            try {
+                return c.call();
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
     }
 
     /**


[04/29] incubator-ignite git commit: # ignite-970

Posted by ag...@apache.org.
# ignite-970


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7158fb6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7158fb6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7158fb6a

Branch: refs/heads/ignite-389-ipc
Commit: 7158fb6a4ff7b9db3afda73f75376ad3285c556c
Parents: d6f9b64
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 15:57:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 15:57:16 2015 +0300

----------------------------------------------------------------------
 .../GridTcpCommunicationSpiMultithreadedSelfTest.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7158fb6a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 5d25299..dc7f344 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -62,8 +62,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
     /** SPIs */
-    private static final Map<UUID, CommunicationSpi<Message>> spis =
-        new ConcurrentHashMap<>();
+    private static final Map<UUID, CommunicationSpi<Message>> spis = new ConcurrentHashMap<>();
 
     /** Listeners. */
     private static final Map<UUID, MessageListener> lsnrs = new HashMap<>();
@@ -85,13 +84,20 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /**
      * @param useShmem Use shared mem.
      */
-    public GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+    protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
         super(false);
 
         this.useShmem = useShmem;
     }
 
     /**
+     *
+     */
+    public GridTcpCommunicationSpiMultithreadedSelfTest() {
+        this(false);
+    }
+
+    /**
      * Accumulating listener.
      */
     @SuppressWarnings({"deprecation"})


[18/29] incubator-ignite git commit: # ignite-sprint-5

Posted by ag...@apache.org.
# ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7501025d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7501025d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7501025d

Branch: refs/heads/ignite-389-ipc
Commit: 7501025d815a61ef881d86fa326bc6e17064ec0e
Parents: ae5189a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 11:01:15 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:01:15 2015 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/internal/IgniteKernal.java    | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7501025d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..a0d97c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -963,8 +963,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                             sysPoolQSize = exec.getQueue().size();
                         }
 
+                        String id = U.id8(localNode().id());
+
                         String msg = NL +
                             "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+                            "    ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
                             "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
                             "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
                                 dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +


[19/29] incubator-ignite git commit: Merge branch 'ignite-988' into ignite-sprint-5

Posted by ag...@apache.org.
Merge branch 'ignite-988' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e625709b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e625709b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e625709b

Branch: refs/heads/ignite-389-ipc
Commit: e625709b2f74853ef883df6cafa46b8a2b0245f7
Parents: ae5189a 46eab5b
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:06:25 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:06:25 2015 +0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
 .../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[29/29] incubator-ignite git commit: IGNITE-389 - IPC checked and API improvements.

Posted by ag...@apache.org.
IGNITE-389 - IPC checked and API improvements.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b51f99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b51f99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b51f99e

Branch: refs/heads/ignite-389-ipc
Commit: 6b51f99e72eb11af25403f8ec50087c03b1f1fb7
Parents: 1d8643c
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 4 19:19:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 4 19:19:36 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |   4 +-
 .../shmem/IpcSharedMemoryClientEndpoint.java    |   2 +-
 .../ipc/shmem/IpcSharedMemoryNativeLoader.java  | 150 +++++++++++++++++--
 .../shmem/IpcSharedMemoryServerEndpoint.java    |   2 +-
 .../IpcSharedMemoryCrashDetectionSelfTest.java  |   2 +-
 .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java |   2 +-
 .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java |   2 +-
 .../LoadWithCorruptedLibFileTestRunner.java     |   2 +-
 .../IpcSharedMemoryBenchmarkReader.java         |   2 +-
 .../IpcSharedMemoryBenchmarkWriter.java         |   2 +-
 .../hadoop/HadoopAbstractSelfTest.java          |   1 +
 .../org/apache/ignite/spark/IgniteContext.scala |  19 ++-
 .../org/apache/ignite/spark/IgniteRDD.scala     |   8 +-
 13 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0932212..9016b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9025,11 +9025,11 @@ public abstract class IgniteUtils {
                 hasShmem = false;
             else {
                 try {
-                    IpcSharedMemoryNativeLoader.load();
+                    IpcSharedMemoryNativeLoader.load(null);
 
                     hasShmem = true;
                 }
-                catch (IgniteCheckedException e) {
+                catch (IgniteCheckedException ignore) {
                     hasShmem = false;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index 27a234f..c935c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
         boolean clear = true;
 
         try {
-            IpcSharedMemoryNativeLoader.load();
+            IpcSharedMemoryNativeLoader.load(log);
 
             sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
index dc00ca6..8c345f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.ipc.shmem;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
@@ -25,6 +26,8 @@ import java.net.*;
 import java.nio.channels.*;
 import java.security.*;
 import java.util.*;
+import java.util.jar.*;
+import java.util.zip.*;
 
 import static org.apache.ignite.internal.IgniteVersionUtils.*;
 
@@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader {
     /** Library name base. */
     private static final String LIB_NAME_BASE = "igniteshmem";
 
+    /** Library jar name base. */
+    private static final String JAR_NAME_BASE = "shmem";
+
     /** Library name. */
     static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR;
 
@@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
+     * @param log Logger, if available. If null, warnings will be printed out to console.
      * @throws IgniteCheckedException If failed.
      */
-    public static void load() throws IgniteCheckedException {
+    public static void load(IgniteLogger log) throws IgniteCheckedException {
         if (loaded)
             return;
 
@@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader {
             if (loaded)
                 return;
 
-            doLoad();
+            doLoad(log);
 
             loaded = true;
         }
@@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader {
     /**
      * @throws IgniteCheckedException If failed.
      */
-    private static void doLoad() throws IgniteCheckedException {
+    private static void doLoad(IgniteLogger log) throws IgniteCheckedException {
         assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class);
 
         Collection<Throwable> errs = new ArrayList<>();
@@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader {
 
         // Obtain lock on file to prevent concurrent extracts.
         try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws");
-             FileLock ignored = randomAccessFile.getChannel().lock()) {
+            FileLock ignored = randomAccessFile.getChannel().lock()) {
             if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath()))
                 return;
 
@@ -134,6 +141,30 @@ public class IpcSharedMemoryNativeLoader {
             if (extractAndLoad(errs, tmpDir, resourcePath()))
                 return;
 
+            try {
+                U.quietAndWarn(log, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME.");
+
+                String igniteHome = X.resolveIgniteHome();
+
+                File shmemJar = findShmemJar(errs, igniteHome);
+
+                if (shmemJar != null) {
+                    try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) {
+                        if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath()))
+                            return;
+
+                        if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath()))
+                            return;
+
+                        if (extractAndLoad(errs, jar, tmpDir, resourcePath()))
+                            return;
+                    }
+                }
+            }
+            catch (IgniteCheckedException ignore) {
+
+            }
+
             // Failed to find the library.
             assert !errs.isEmpty();
 
@@ -145,6 +176,32 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
+     * Tries to find shmem jar in IGNITE_HOME/libs folder.
+     *
+     * @param errs Collection of errors to add readable exception to.
+     * @param igniteHome Resolver IGNITE_HOME variable.
+     * @return File, if found.
+     */
+    private static File findShmemJar(Collection<Throwable> errs, String igniteHome) {
+        File libs = new File(igniteHome, "libs");
+
+        if (!libs.exists() || libs.isFile()) {
+            errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome));
+
+            return null;
+        }
+
+        for (File lib : libs.listFiles()) {
+            if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE))
+                return lib;
+        }
+
+        errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome));
+
+        return null;
+    }
+
+    /**
      * Gets temporary directory unique for each OS user.
      * The directory guaranteed to exist, though may not be empty.
      */
@@ -220,6 +277,24 @@ public class IpcSharedMemoryNativeLoader {
 
     /**
      * @param errs Errors collection.
+     * @param rsrcPath Path.
+     * @return {@code True} if library was found and loaded.
+     */
+    private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar, File tmpDir, String rsrcPath) {
+        ZipEntry rsrc = jar.getEntry(rsrcPath);
+
+        if (rsrc != null)
+            return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME)));
+        else {
+            errs.add(new IllegalStateException("Failed to find resource within specified jar file " +
+                "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']'));
+
+            return false;
+        }
+    }
+
+    /**
+     * @param errs Errors collection.
      * @param src Source.
      * @param target Target.
      * @return {@code True} if resource was found and loaded.
@@ -230,7 +305,7 @@ public class IpcSharedMemoryNativeLoader {
         InputStream is = null;
 
         try {
-            if (!target.exists() || !haveEqualMD5(target, src)) {
+            if (!target.exists() || !haveEqualMD5(target, src.openStream())) {
                 is = src.openStream();
 
                 if (is != null) {
@@ -265,20 +340,69 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
-     * @param target Target.
+     * @param errs Errors collection.
      * @param src Source.
+     * @param target Target.
+     * @return {@code True} if resource was found and loaded.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile jar, File target) {
+        FileOutputStream os = null;
+        InputStream is = null;
+
+        try {
+            if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) {
+                is = jar.getInputStream(src);
+
+                if (is != null) {
+                    os = new FileOutputStream(target);
+
+                    int read;
+
+                    byte[] buf = new byte[4096];
+
+                    while ((read = is.read(buf)) != -1)
+                        os.write(buf, 0, read);
+                }
+            }
+
+            // chmod 775.
+            if (!U.isWindows())
+                Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor();
+
+            System.load(target.getPath());
+
+            return true;
+        }
+        catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) {
+            errs.add(e);
+        }
+        finally {
+            U.closeQuiet(os);
+            U.closeQuiet(is);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param target Target.
+     * @param srcIS Source input stream.
      * @return {@code True} if target md5-sum equal to source md5-sum.
      * @throws NoSuchAlgorithmException If md5 algorithm was not found.
      * @throws IOException If an I/O exception occurs.
      */
-    private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException {
-        try (InputStream targetIS = new FileInputStream(target);
-             InputStream srcIS = src.openStream()) {
-
-            String targetMD5 = U.calculateMD5(targetIS);
-            String srcMD5 = U.calculateMD5(srcIS);
+    private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException {
+        try {
+            try (InputStream targetIS = new FileInputStream(target)) {
+                String targetMD5 = U.calculateMD5(targetIS);
+                String srcMD5 = U.calculateMD5(srcIS);
 
-            return targetMD5.equals(srcMD5);
+                return targetMD5.equals(srcMD5);
+            }
+        }
+        finally {
+            srcIS.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 5185856..102c5b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log);
 
         pid = IpcSharedMemoryUtils.pid();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 2ddf6f3..c6f590e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
index 7dc0870..4afb64b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
@@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
index 4c5413c..176429e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
@@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
index 8ff827b..8fee239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
@@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner {
 
         createCorruptedLibFile();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
index 28495af..89eeda1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
@@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP
      * @throws IgniteCheckedException If failed.
      */
     public static void main(String[] args) throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
 
         int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
index 2ade145..e8a8402 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP
      * @throws IgniteCheckedException If failed.
      */
     public static void main(String[] args) throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
 
         int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index 517a587..a3c9bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 5cdbad0..2cfebd6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -34,8 +34,23 @@ import org.apache.spark.sql.SQLContext
  */
 class IgniteContext[K, V](
     @scala.transient val sparkContext: SparkContext,
-    cfgF: () ⇒ IgniteConfiguration
+    cfgF: () ⇒ IgniteConfiguration,
+    client: Boolean = true
 ) extends Serializable {
+    @scala.transient private val driver = true
+
+    if (!client) {
+        val workers = sparkContext.getExecutorStorageStatus.length - 1
+
+        if (workers <= 0)
+            throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
+
+        println("Will start Ignite nodes on " + workers + " workers")
+
+        // Start ignite server node on each worker in server mode.
+        sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+    }
+
     def this(
         sc: SparkContext,
         springUrl: String
@@ -62,7 +77,7 @@ class IgniteContext[K, V](
         catch {
             case e: Exception ⇒
                 try {
-                    igniteCfg.setClientMode(true)
+                    igniteCfg.setClientMode(client || driver)
 
                     Ignition.start(igniteCfg)
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..0d1a3be 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -114,7 +114,7 @@ class IgniteRDD[K, V] (
         ic.sqlContext.createDataFrame(rowRdd, schema)
     }
 
-    def saveValues(rdd: RDD[V]) = {
+    def saveValues(rdd: RDD[V], overwrite: Boolean = false) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -127,6 +127,8 @@ class IgniteRDD[K, V] (
             val streamer = ig.dataStreamer[Object, V](cacheName)
 
             try {
+                streamer.allowOverwrite(overwrite)
+
                 it.foreach(value ⇒ {
                     val key = affinityKeyFunc(value, node.orNull)
 
@@ -139,7 +141,7 @@ class IgniteRDD[K, V] (
         })
     }
 
-    def savePairs(rdd: RDD[(K, V)]) = {
+    def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -149,6 +151,8 @@ class IgniteRDD[K, V] (
             val streamer = ig.dataStreamer[K, V](cacheName)
 
             try {
+                streamer.allowOverwrite(overwrite)
+
                 it.foreach(tup ⇒ {
                     streamer.addData(tup._1, tup._2)
                 })


[28/29] incubator-ignite git commit: IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389-ipc

Posted by ag...@apache.org.
IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389-ipc


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d8643c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d8643c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d8643c0

Branch: refs/heads/ignite-389-ipc
Commit: 1d8643c0b93786f7eeff82bb56b64e6df53b3697
Parents: a329e90 c9f7291
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 4 11:09:30 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 4 11:09:30 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  13 +-
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../managers/communication/GridIoManager.java   | 117 ++++----
 .../processors/cache/GridCacheContext.java      |   3 -
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  16 +-
 .../GridDhtPartitionsExchangeFuture.java        |  29 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../processors/hadoop/HadoopTaskContext.java    |  14 +-
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   2 +-
 .../internal/visor/query/VisorQueryJob.java     |   2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  31 --
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  56 +++-
 .../tcp/ipfinder/TcpDiscoveryIpFinder.java      |  10 +-
 .../TcpDiscoveryMulticastIpFinder.java          |  47 +++-
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  62 ++++
 ...niteDynamicCacheWithConfigStartSelfTest.java |  35 +--
 .../igfs/IgfsClientCacheSelfTest.java           |   9 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++
 .../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   1 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  |  43 +--
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  14 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  70 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   2 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
 .../hadoop/SecondaryFileSystemProvider.java     |   4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  20 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  31 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  26 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  48 +++-
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../hadoop/HadoopAbstractSelfTest.java          |  14 +-
 .../hadoop/HadoopCommandLineTest.java           |  14 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 176 +++++++++++-
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../collections/HadoopAbstractMapTest.java      |  12 +
 ...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 .../commands/cache/VisorCacheScanCommand.scala  |   2 +-
 48 files changed, 1358 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------


[26/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5

Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20e56777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20e56777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20e56777

Branch: refs/heads/ignite-389-ipc
Commit: 20e567773b5b0674754a348b6d41d29370f3bd87
Parents: a03d111 a6ea325
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 15:06:44 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 15:06:44 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 117 +++++++------
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../igfs/IgfsClientCacheSelfTest.java           |   9 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 5 files changed, 238 insertions(+), 55 deletions(-)
----------------------------------------------------------------------



[13/29] incubator-ignite git commit: # IGNITE-988. Reworked scan command.

Posted by ag...@apache.org.
# IGNITE-988. Reworked scan command.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46eab5b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46eab5b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46eab5b6

Branch: refs/heads/ignite-389-ipc
Commit: 46eab5b67f1f10260b27a1eb0474904982aa3725
Parents: bd3abbc
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 10:02:02 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 10:02:02 2015 +0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
 .../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index 4a9daad..8915240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -65,7 +65,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
         try {
             UUID nid = ignite.localNode().id();
 
-            boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN");
+            boolean scan = arg.queryTxt() == null;
 
             String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" +
                 UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 4b66720..3aa2a19 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -139,7 +139,7 @@ class VisorCacheScanCommand {
         val firstPage =
             try
                 executeRandom(groupForDataNode(node, cacheName),
-                    classOf[VisorQueryTask], new VisorQueryArg(cacheName, "SCAN", false, pageSize)) match {
+                    classOf[VisorQueryTask], new VisorQueryArg(cacheName, null, false, pageSize)) match {
                     case x if x.get1() != null =>
                         error(x.get1())
 


[03/29] incubator-ignite git commit: # ignite-970

Posted by ag...@apache.org.
# ignite-970


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d6f9b647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d6f9b647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d6f9b647

Branch: refs/heads/ignite-389-ipc
Commit: d6f9b647ab92d822aebbef06315ccb0af41f8238
Parents: 39ce1cb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 15:39:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 15:39:12 2015 +0300

----------------------------------------------------------------------
 modules/core/pom.xml                                               | 1 -
 .../tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java          | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d6f9b647/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 370fe69..8c37a4f 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -129,7 +129,6 @@
             <groupId>org.gridgain</groupId>
             <artifactId>ignite-shmem</artifactId>
             <version>1.0.0</version>
-            <scope>test</scope>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d6f9b647/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 9909d76..5d25299 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -85,7 +85,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /**
      * @param useShmem Use shared mem.
      */
-    protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+    public GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
         super(false);
 
         this.useShmem = useShmem;


[14/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-991

Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-991


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38c084a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38c084a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38c084a8

Branch: refs/heads/ignite-389-ipc
Commit: 38c084a81850d26e336382822574004b79fce935
Parents: 1de11ff bd3abbc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:27:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:27:03 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/processors/cache/GridCacheContext.java | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------



[23/29] incubator-ignite git commit: # ignite-981 fixed wait for cache initialization on clients

Posted by ag...@apache.org.
# ignite-981 fixed wait for cache initialization on clients


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f

Branch: refs/heads/ignite-389-ipc
Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f
Parents: 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:25:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:50:36 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 117 +++++++------
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6e8d457..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            Object msgBody = ioMsg.body();
-
-            assert msgBody != null || ioMsg.bodyBytes() != null;
+            busyLock.readLock();
 
             try {
-                byte[] msgTopicBytes = ioMsg.topicBytes();
-
-                Object msgTopic = ioMsg.topic();
-
-                GridDeployment dep = ioMsg.deployment();
-
-                if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
-                    ioMsg.deploymentClassName() != null) {
-                    dep = ctx.deploy().getGlobalDeployment(
-                        ioMsg.deploymentMode(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.userVersion(),
-                        nodeId,
-                        ioMsg.classLoaderId(),
-                        ioMsg.loaderParticipants(),
-                        null);
-
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException(
-                            "Failed to obtain deployment information for user message. " +
-                            "If you are using custom message or topic class, try implementing " +
-                            "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
-                    ioMsg.deployment(dep); // Cache deployment.
+                if (stopping) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received user message while stopping (will ignore) [nodeId=" +
+                            nodeId + ", msg=" + msg + ']');
+
+                    return;
                 }
 
-                // Unmarshall message topic if needed.
-                if (msgTopic == null && msgTopicBytes != null) {
-                    msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+                Object msgBody = ioMsg.body();
 
-                    ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
-                }
+                assert msgBody != null || ioMsg.bodyBytes() != null;
 
-                if (!F.eq(topic, msgTopic))
-                    return;
+                try {
+                    byte[] msgTopicBytes = ioMsg.topicBytes();
+
+                    Object msgTopic = ioMsg.topic();
+
+                    GridDeployment dep = ioMsg.deployment();
+
+                    if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+                        ioMsg.deploymentClassName() != null) {
+                        dep = ctx.deploy().getGlobalDeployment(
+                            ioMsg.deploymentMode(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.userVersion(),
+                            nodeId,
+                            ioMsg.classLoaderId(),
+                            ioMsg.loaderParticipants(),
+                            null);
+
+                        if (dep == null)
+                            throw new IgniteDeploymentCheckedException(
+                                "Failed to obtain deployment information for user message. " +
+                                    "If you are using custom message or topic class, try implementing " +
+                                    "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+                        ioMsg.deployment(dep); // Cache deployment.
+                    }
 
-                if (msgBody == null) {
-                    msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+                    // Unmarshall message topic if needed.
+                    if (msgTopic == null && msgTopicBytes != null) {
+                        msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
 
-                    ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
-                }
+                        ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+                    }
 
-                // Resource injection.
-                if (dep != null)
-                    ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
-                    msg + ']', e);
-            }
+                    if (!F.eq(topic, msgTopic))
+                        return;
+
+                    if (msgBody == null) {
+                        msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+
+                        ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+                    }
 
-            if (msgBody != null) {
-                if (predLsnr != null) {
-                    if (!predLsnr.apply(nodeId, msgBody))
-                        removeMessageListener(TOPIC_COMM_USER, this);
+                    // Resource injection.
+                    if (dep != null)
+                        ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
                 }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+                        msg + ']', e);
+                }
+
+                if (msgBody != null) {
+                    if (predLsnr != null) {
+                        if (!predLsnr.apply(nodeId, msgBody))
+                            removeMessageListener(TOPIC_COMM_USER, this);
+                    }
+                }
+            }
+            finally {
+                busyLock.readUnlock();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1aef18c..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
new file mode 100644
index 0000000..855a4f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Message topic. */
+    private enum TOPIC {
+        /** */
+        ORDERED
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new OptimizedMarshaller(false));
+
+        if (gridName.equals(getTestGridName(2))) {
+            cfg.setClientMode(true);
+
+            ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        }
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageSendWithClientJoin() throws Exception {
+        startGrid(0);
+
+        Ignite ignite1 = startGrid(1);
+
+        ClusterGroup rmts = ignite1.cluster().forRemotes();
+
+        IgniteMessaging msg = ignite1.message(rmts);
+
+        msg.localListen(TOPIC.ORDERED, new LocalListener());
+
+        msg.remoteListen(TOPIC.ORDERED, new RemoteListener());
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+
+                while (!stop.get()) {
+                    if (iter % 10 == 0)
+                        log.info("Client start/stop iteration: " + iter);
+
+                    iter++;
+
+                    try (Ignite ignite = startGrid(2)) {
+                        assertTrue(ignite.configuration().isClientMode());
+                    }
+                }
+
+                return null;
+            }
+        }, 1, "client-start-stop");
+
+        try {
+            long stopTime = U.currentTimeMillis() + 30_000;
+
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stopTime) {
+                try {
+                    ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0);
+                }
+                catch (IgniteException e) {
+                    log.info("Message send failed: " + e);
+                }
+
+                iter++;
+
+                if (iter % 100 == 0)
+                    Thread.sleep(5);
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        fut.get();
+    }
+
+    /**
+     *
+     */
+    private static class LocalListener implements IgniteBiPredicate<UUID, String> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, String s) {
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteListener implements IgniteBiPredicate<UUID, String> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, String msg) {
+            ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9eb31f1..e0a1e6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridSelfTest.class));
         suite.addTest(new TestSuite(GridProjectionSelfTest.class));
         suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+        suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
         suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
 
         if (U.isLinux() || U.isMacOs())


[16/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-989' into ignite-sprint-5

Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-989' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b12bb4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b12bb4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b12bb4a

Branch: refs/heads/ignite-389-ipc
Commit: 1b12bb4a80a04071c77d2603956505bd66fa19ea
Parents: bd3abbc 9ad3617
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:39:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:39:12 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  31 -----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  56 +++++++-
 .../tcp/ipfinder/TcpDiscoveryIpFinder.java      |  10 +-
 .../TcpDiscoveryMulticastIpFinder.java          |  47 +++++--
 .../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  |  43 ++++---
 8 files changed, 244 insertions(+), 76 deletions(-)
----------------------------------------------------------------------



[11/29] incubator-ignite git commit: IGNITE-991 - Fix cache start from client node config.

Posted by ag...@apache.org.
IGNITE-991 - Fix cache start from client node config.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de11fff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de11fff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de11fff

Branch: refs/heads/ignite-389-ipc
Commit: 1de11fff3cea7883a2f28a35107d7c9dfc75d5e0
Parents: 97d0bc1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 16:34:12 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 16:34:12 2015 -0700

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  6 +-
 .../GridDhtPartitionsExchangeFuture.java        | 18 +++-
 ...niteDynamicCacheWithConfigStartSelfTest.java | 98 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |  1 +
 5 files changed, 116 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 2049d03..c3f3e7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -220,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1ae4ae7..af121c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -249,7 +249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -276,7 +276,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (cctx.rebalanceEnabled()) {
                 for (int p = 0; p < num; p++) {
                     // If this is the first node in grid.
-                    boolean added = exchFut.isCacheAdded(cctx.cacheId());
+                    boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
 
                     if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
                         assert exchId.isJoined() || added;
@@ -668,7 +668,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ']';
+                ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db43c6c..a03e2e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -295,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache was added during this exchange.
      */
-    public boolean isCacheAdded(int cacheId) {
+    public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
                 if (req.start() && !req.clientStartOnly()) {
@@ -305,7 +305,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
 
-        return false;
+        GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+        return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
     }
 
     /**
@@ -505,11 +507,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             if (cacheCtx.isLocal())
                                 continue;
 
-                            cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
-
                             GridDhtPartitionTopology top = cacheCtx.topology();
 
                             top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+                            if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+                                initTopology(cacheCtx);
+
+                                top.beforeExchange(this);
+                            }
+                            else
+                                cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
                         }
 
                         if (exchId.isLeft())
@@ -566,7 +574,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 assert oldestNode.get() != null;
 
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (isCacheAdded(cacheCtx.cacheId())) {
+                    if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
                         if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..dcd6a69
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        if (client)
+            cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, String.class);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnClient() throws Exception {
+        int srvCnt = 3;
+
+        startGrids(srvCnt);
+
+        try {
+            client = true;
+
+            IgniteEx client = startGrid(srvCnt);
+
+            for (int i = 0; i < 100; i++)
+                client.cache(CACHE_NAME).put(i, i);
+
+            for (int i = 0; i < 100; i++)
+                assertEquals(i, grid(0).cache(CACHE_NAME).get(i));
+
+            client.cache(CACHE_NAME).removeAll();
+
+            for (int i = 0; i < 100; i++)
+                assertNull(grid(0).cache(CACHE_NAME).get(i));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index a8019d2..15756d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -99,6 +99,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
 
         suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
+        suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
         suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
         suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class);
         suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);


[21/29] incubator-ignite git commit: Merge branches 'ignite-983' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-983

Posted by ag...@apache.org.
Merge branches 'ignite-983' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-983


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46b24472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46b24472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46b24472

Branch: refs/heads/ignite-389-ipc
Commit: 46b244723a0c39c7bb5bb92157d71032d655923a
Parents: 51d4737 922c1c4
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:29:43 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:29:43 2015 +0700

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../processors/cache/GridCacheContext.java      |   3 -
 .../dht/GridDhtPartitionTopologyImpl.java       |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  10 +-
 .../internal/visor/query/VisorQueryJob.java     |   2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  31 -----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  56 +++++++-
 .../tcp/ipfinder/TcpDiscoveryIpFinder.java      |  10 +-
 .../TcpDiscoveryMulticastIpFinder.java          |  47 +++++--
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  62 +++++++++
 .../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  |  43 ++++---
 .../commands/cache/VisorCacheScanCommand.scala  |   2 +-
 15 files changed, 326 insertions(+), 84 deletions(-)
----------------------------------------------------------------------