You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/12 16:25:06 UTC
[16/50] [abbrv] incubator-ignite git commit: ignite-sprint-6: merge
from ignite-sprint-5
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
index dc00ca6..d4ae147 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.ipc.shmem;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
@@ -25,6 +26,8 @@ import java.net.*;
import java.nio.channels.*;
import java.security.*;
import java.util.*;
+import java.util.jar.*;
+import java.util.zip.*;
import static org.apache.ignite.internal.IgniteVersionUtils.*;
@@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader {
/** Library name base. */
private static final String LIB_NAME_BASE = "igniteshmem";
+ /** Library jar name base. */
+ private static final String JAR_NAME_BASE = "shmem";
+
/** Library name. */
static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR;
@@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * @param log Logger, if available. If null, warnings will be printed out to console.
* @throws IgniteCheckedException If failed.
*/
- public static void load() throws IgniteCheckedException {
+ public static void load(IgniteLogger log) throws IgniteCheckedException {
if (loaded)
return;
@@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader {
if (loaded)
return;
- doLoad();
+ doLoad(log);
loaded = true;
}
@@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader {
/**
* @throws IgniteCheckedException If failed.
*/
- private static void doLoad() throws IgniteCheckedException {
+ private static void doLoad(IgniteLogger log) throws IgniteCheckedException {
assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class);
Collection<Throwable> errs = new ArrayList<>();
@@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader {
// Obtain lock on file to prevent concurrent extracts.
try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws");
- FileLock ignored = randomAccessFile.getChannel().lock()) {
+ FileLock ignored = randomAccessFile.getChannel().lock()) {
if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath()))
return;
@@ -134,6 +141,31 @@ public class IpcSharedMemoryNativeLoader {
if (extractAndLoad(errs, tmpDir, resourcePath()))
return;
+ try {
+ if (log != null)
+ LT.warn(log, null, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME.");
+
+ String igniteHome = X.resolveIgniteHome();
+
+ File shmemJar = findShmemJar(errs, igniteHome);
+
+ if (shmemJar != null) {
+ try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) {
+ if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, resourcePath()))
+ return;
+ }
+ }
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
// Failed to find the library.
assert !errs.isEmpty();
@@ -145,6 +177,32 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * Tries to find shmem jar in IGNITE_HOME/libs folder.
+ *
+ * @param errs Collection of errors to add readable exception to.
+ * @param igniteHome Resolver IGNITE_HOME variable.
+ * @return File, if found.
+ */
+ private static File findShmemJar(Collection<Throwable> errs, String igniteHome) {
+ File libs = new File(igniteHome, "libs");
+
+ if (!libs.exists() || libs.isFile()) {
+ errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ for (File lib : libs.listFiles()) {
+ if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE))
+ return lib;
+ }
+
+ errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ /**
* Gets temporary directory unique for each OS user.
* The directory guaranteed to exist, though may not be empty.
*/
@@ -220,6 +278,24 @@ public class IpcSharedMemoryNativeLoader {
/**
* @param errs Errors collection.
+ * @param rsrcPath Path.
+ * @return {@code True} if library was found and loaded.
+ */
+ private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar, File tmpDir, String rsrcPath) {
+ ZipEntry rsrc = jar.getEntry(rsrcPath);
+
+ if (rsrc != null)
+ return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME)));
+ else {
+ errs.add(new IllegalStateException("Failed to find resource within specified jar file " +
+ "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']'));
+
+ return false;
+ }
+ }
+
+ /**
+ * @param errs Errors collection.
* @param src Source.
* @param target Target.
* @return {@code True} if resource was found and loaded.
@@ -230,7 +306,7 @@ public class IpcSharedMemoryNativeLoader {
InputStream is = null;
try {
- if (!target.exists() || !haveEqualMD5(target, src)) {
+ if (!target.exists() || !haveEqualMD5(target, src.openStream())) {
is = src.openStream();
if (is != null) {
@@ -265,20 +341,69 @@ public class IpcSharedMemoryNativeLoader {
}
/**
- * @param target Target.
+ * @param errs Errors collection.
* @param src Source.
+ * @param target Target.
+ * @return {@code True} if resource was found and loaded.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile jar, File target) {
+ FileOutputStream os = null;
+ InputStream is = null;
+
+ try {
+ if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) {
+ is = jar.getInputStream(src);
+
+ if (is != null) {
+ os = new FileOutputStream(target);
+
+ int read;
+
+ byte[] buf = new byte[4096];
+
+ while ((read = is.read(buf)) != -1)
+ os.write(buf, 0, read);
+ }
+ }
+
+ // chmod 775.
+ if (!U.isWindows())
+ Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor();
+
+ System.load(target.getPath());
+
+ return true;
+ }
+ catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) {
+ errs.add(e);
+ }
+ finally {
+ U.closeQuiet(os);
+ U.closeQuiet(is);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param target Target.
+ * @param srcIS Source input stream.
* @return {@code True} if target md5-sum equal to source md5-sum.
* @throws NoSuchAlgorithmException If md5 algorithm was not found.
* @throws IOException If an I/O exception occurs.
*/
- private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException {
- try (InputStream targetIS = new FileInputStream(target);
- InputStream srcIS = src.openStream()) {
-
- String targetMD5 = U.calculateMD5(targetIS);
- String srcMD5 = U.calculateMD5(srcIS);
+ private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException {
+ try {
+ try (InputStream targetIS = new FileInputStream(target)) {
+ String targetMD5 = U.calculateMD5(targetIS);
+ String srcMD5 = U.calculateMD5(srcIS);
- return targetMD5.equals(srcMD5);
+ return targetMD5.equals(srcMD5);
+ }
+ }
+ finally {
+ srcIS.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 5185856..102c5b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
pid = IpcSharedMemoryUtils.pid();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
new file mode 100644
index 0000000..e05c37a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridShmemCommunicationClient extends GridAbstractCommunicationClient {
+ /** */
+ private final IpcSharedMemoryClientEndpoint shmem;
+
+ /** */
+ private final ByteBuffer writeBuf;
+
+ /** */
+ private final MessageFormatter formatter;
+
+ /**
+ * @param metricsLsnr Metrics listener.
+ * @param port Shared memory IPC server port.
+ * @param connTimeout Connection timeout.
+ * @param log Logger.
+ * @param formatter Message formatter.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+ int port,
+ long connTimeout,
+ IgniteLogger log,
+ MessageFormatter formatter)
+ throws IgniteCheckedException
+ {
+ super(metricsLsnr);
+
+ assert metricsLsnr != null;
+ assert port > 0 && port < 0xffff;
+ assert connTimeout >= 0;
+
+ shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
+
+ writeBuf = ByteBuffer.allocate(8 << 10);
+
+ writeBuf.order(ByteOrder.nativeOrder());
+
+ this.formatter = formatter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
+ throws IgniteCheckedException {
+ handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean close() {
+ boolean res = super.close();
+
+ if (res)
+ shmem.close();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forceClose() {
+ super.forceClose();
+
+ // Do not call forceClose() here.
+ shmem.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Communication client was closed: " + this);
+
+ try {
+ shmem.outputStream().write(data, 0, len);
+
+ metricsLsnr.onBytesSent(len);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+ }
+
+ markUsed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg)
+ throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Communication client was closed: " + this);
+
+ assert writeBuf.hasArray();
+
+ try {
+ int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+
+ metricsLsnr.onBytesSent(cnt);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+ }
+
+ markUsed();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridShmemCommunicationClient.class, this, super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 359de1c..a661965 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -25,15 +25,19 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -139,6 +143,10 @@ import static org.apache.ignite.events.EventType.*;
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter
implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+ /** IPC error message. */
+ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
+ "(switching to TCP, may be slower).";
+
/** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
public static final String ATTR_ADDRS = "comm.tcp.addrs";
@@ -148,12 +156,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
public static final String ATTR_PORT = "comm.tcp.port";
+ /** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
+ public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
+
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
+ /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
+ public static final int DFLT_SHMEM_PORT = 48100;
+
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
@@ -287,7 +301,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert ses.accepted();
if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
else {
assert msg instanceof HandshakeMessage : msg;
@@ -316,6 +330,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient oldClient = clients.get(sndId);
+ boolean hasShmemClient = false;
+
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
@@ -327,6 +343,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
}
GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
@@ -353,10 +374,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
}
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (log.isDebugEnabled())
log.debug("Received incoming connection from remote node " +
@@ -365,7 +391,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (reserved) {
try {
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
@@ -387,11 +413,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
else {
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (reserved) {
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
@@ -459,6 +485,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param node Node.
* @param rcvCnt Number of received messages..
* @param sndRes If {@code true} sends response for recovery handshake.
+ * @param createClient If {@code true} creates NIO communication client.
* @return Client.
*/
private GridTcpNioCommunicationClient connected(
@@ -466,7 +493,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioSession ses,
ClusterNode node,
long rcvCnt,
- boolean sndRes) {
+ boolean sndRes,
+ boolean createClient) {
recovery.onHandshake(rcvCnt);
ses.recoveryDescriptor(recovery);
@@ -478,12 +506,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
recovery.connected();
- GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log);
+ GridTcpNioCommunicationClient client = null;
- GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+ if (createClient) {
+ client = new GridTcpNioCommunicationClient(ses, log);
- assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
+ GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+
+ assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+ }
return client;
}
@@ -511,22 +543,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final GridFutureAdapter<GridCommunicationClient> fut;
+ /** */
+ private final boolean createClient;
+
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
* @param msg Handshake message.
+ * @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
*/
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
HandshakeMessage msg,
+ boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
this.msg = msg;
+ this.createClient = createClient;
this.fut = fut;
}
@@ -539,7 +577,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
msgFut.get();
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg.received(), false);
+ connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
fut.onDone(client);
}
@@ -588,6 +626,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Local port range. */
private int locPortRange = DFLT_PORT_RANGE;
+ /** Local port which node uses to accept shared memory connections. */
+ private int shmemPort = DFLT_SHMEM_PORT;
+
/** Allocate direct buffer or heap buffer. */
private boolean directBuf = true;
@@ -622,6 +663,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** NIO server. */
private GridNioServer<Message> nioSrvr;
+ /** Shared memory server. */
+ private IpcSharedMemoryServerEndpoint shmemSrv;
+
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
@@ -636,6 +680,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Recovery and idle clients handler. */
private CommunicationWorker commWorker;
+
+ /** Shared memory accept worker. */
+ private ShmemAcceptWorker shmemAcceptWorker;
+
+ /** Shared memory workers. */
+ private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -646,6 +696,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Bound port. */
private int boundTcpPort = -1;
+ /** Bound port for shared memory server. */
+ private int boundTcpShmemPort = -1;
+
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
@@ -789,6 +842,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Sets local port to accept shared memory connections.
+ * <p>
+ * If set to {@code -1} shared memory communication will be disabled.
+ * <p>
+ * If not provided, default value is {@link #DFLT_SHMEM_PORT}.
+ *
+ * @param shmemPort Port number.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setSharedMemoryPort(int shmemPort) {
+ this.shmemPort = shmemPort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSharedMemoryPort() {
+ return shmemPort;
+ }
+
+ /**
* Sets maximum idle connection timeout upon which a connection
* to client will be closed.
* <p>
@@ -1153,6 +1225,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
+ assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
assertParameter(minBufferedMsgCnt >= 0, "minBufferedMsgCnt >= 0");
@@ -1178,6 +1251,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
+ shmemSrv = resetShmemServer();
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to start shared memory communication server.", e);
+ }
+
+ try {
// This method potentially resets local port to the value
// local node was bound to.
nioSrvr = resetNioServer();
@@ -1197,6 +1277,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
createSpiAttributeName(ATTR_ADDRS), addrs.get1(),
createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
createSpiAttributeName(ATTR_PORT), boundTcpPort,
+ createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null,
createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
}
catch (IOException | IgniteCheckedException e) {
@@ -1223,6 +1304,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
+ log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt));
log.debug(configInfo("connTimeout", connTimeout));
@@ -1239,6 +1321,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
+ if (shmemSrv != null) {
+ shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
+
+ new IgniteThread(shmemAcceptWorker).start();
+ }
+
nioSrvr.start();
commWorker = new CommunicationWorker();
@@ -1254,6 +1342,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);
+ // SPI can start without shmem port.
+ if (boundTcpShmemPort > 0)
+ spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
+
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
@@ -1294,7 +1386,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// If configured TCP port is busy, find first available in range.
for (int port = locPort; port < locPort + locPortRange; port++) {
try {
- MessageFactory messageFactory = new MessageFactory() {
+ MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(byte type) {
@@ -1307,7 +1399,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- MessageFormatter messageFormatter = new MessageFormatter() {
+ MessageFormatter msgFormatter = new MessageFormatter() {
private MessageFormatter impl;
@Override public MessageWriter writer() {
@@ -1329,7 +1421,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter);
+ GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
@@ -1356,7 +1448,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.writeTimeout(sockWriteTimeout)
.filters(new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log))
- .messageFormatter(messageFormatter)
+ .messageFormatter(msgFormatter)
.skipRecoveryPredicate(skipRecoveryPred)
.build();
@@ -1388,6 +1480,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
+ /**
+ * Creates new shared memory communication server.
+ * @return Server.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
+ if (boundTcpShmemPort >= 0)
+ throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
+
+ if (shmemPort == -1 || U.isWindows())
+ return null;
+
+ IgniteCheckedException lastEx = null;
+
+ // If configured TCP port is busy, find first available in range.
+ for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
+ try {
+ IpcSharedMemoryServerEndpoint srv =
+ new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName);
+
+ srv.setPort(port);
+
+ srv.omitOutOfResourcesWarning(true);
+
+ srv.start();
+
+ boundTcpShmemPort = port;
+
+ // Ack Port the TCP server was bound to.
+ if (log.isInfoEnabled())
+ log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
+ ", locHost=" + locHost + ']');
+
+ return srv;
+ }
+ catch (IgniteCheckedException e) {
+ lastEx = e;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
+ ", locHost=" + locHost + ']');
+ }
+ }
+
+ // If free port wasn't found.
+ throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
+ locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
+ }
+
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
assert isNodeStopping();
@@ -1399,9 +1540,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.stop();
U.interrupt(commWorker);
-
U.join(commWorker, log);
+ U.cancel(shmemAcceptWorker);
+ U.join(shmemAcceptWorker, log);
+
+ U.cancel(shmemWorkers);
+ U.join(shmemWorkers, log);
+
+ shmemWorkers.clear();
+
// Force closing on stop (safety).
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -1612,13 +1760,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
assert node != null;
- if (getSpiContext().localNode() == null)
+ Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
+
+ ClusterNode locNode = getSpiContext().localNode();
+
+ if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+ // If remote node has shared memory server enabled and has the same set of MACs
+ // then we are likely to run on the same host and shared memory communication could be tried.
+ if (shmemPort != null && U.sameMacs(locNode, node)) {
+ try {
+ return createShmemClient(node, shmemPort);
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(IpcOutOfSystemResourcesException.class))
+ // Has cause or is itself the IpcOutOfSystemResourcesException.
+ LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
+ else if (getSpiContext().node(node.id()) != null)
+ LT.warn(log, null, e.getMessage());
+ else if (log.isDebugEnabled())
+ log.debug("Failed to establish shared memory connection with local node (node has left): " +
+ node.id());
+ }
+ }
+
return createTcpClient(node);
}
/**
+ * @param node Node.
+ * @param port Port.
+ * @return Client.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
+ int attempt = 1;
+
+ int connectAttempts = 1;
+
+ long connTimeout0 = connTimeout;
+
+ while (true) {
+ GridCommunicationClient client;
+
+ try {
+ client = new GridShmemCommunicationClient(metricsLsnr,
+ port,
+ connTimeout,
+ log,
+ getSpiContext().messageFormatter());
+ }
+ catch (IgniteCheckedException e) {
+ // Reconnect for the second time, if connection is not established.
+ if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
+ connectAttempts++;
+
+ continue;
+ }
+
+ throw e;
+ }
+
+ try {
+ safeHandshake(client, null, node.id(), connTimeout0);
+ }
+ catch (HandshakeTimeoutException e) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ client.forceClose();
+
+ if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+ "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+ ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+ ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ throw e;
+ }
+ else {
+ attempt++;
+
+ connTimeout0 *= 2;
+
+ continue;
+ }
+ }
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
+
+ client.forceClose();
+
+ throw e;
+ }
+
+ return client;
+ }
+ }
+
+ /**
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
@@ -2095,6 +2340,144 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * This worker takes responsibility to shut the server down when stopping,
+ * No other thread shall stop passed server.
+ */
+ private class ShmemAcceptWorker extends GridWorker {
+ /** */
+ private final IpcSharedMemoryServerEndpoint srv;
+
+ /**
+ * @param srv Server.
+ */
+ ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
+ super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
+
+ this.srv = srv;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ while (!Thread.interrupted()) {
+ ShmemWorker e = new ShmemWorker(srv.accept());
+
+ shmemWorkers.add(e);
+
+ new IgniteThread(e).start();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (!isCancelled())
+ U.error(log, "Shmem server failed.", e);
+ }
+ finally {
+ srv.close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ super.cancel();
+
+ srv.close();
+ }
+ }
+
+ /**
+ *
+ */
+ private class ShmemWorker extends GridWorker {
+ /** */
+ private final IpcEndpoint endpoint;
+
+ /**
+ * @param endpoint Endpoint.
+ */
+ private ShmemWorker(IpcEndpoint endpoint) {
+ super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
+
+ this.endpoint = endpoint;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ MessageFactory msgFactory = new MessageFactory() {
+ private MessageFactory impl;
+
+ @Nullable @Override public Message create(byte type) {
+ if (impl == null)
+ impl = getSpiContext().messageFactory();
+
+ assert impl != null;
+
+ return impl.create(type);
+ }
+ };
+
+ MessageFormatter msgFormatter = new MessageFormatter() {
+ private MessageFormatter impl;
+
+ @Override public MessageWriter writer() {
+ if (impl == null)
+ impl = getSpiContext().messageFormatter();
+
+ assert impl != null;
+
+ return impl.writer();
+ }
+
+ @Override public MessageReader reader(MessageFactory factory) {
+ if (impl == null)
+ impl = getSpiContext().messageFormatter();
+
+ assert impl != null;
+
+ return impl.reader(factory);
+ }
+ };
+
+ IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
+ metricsLsnr,
+ log,
+ endpoint,
+ srvLsnr,
+ msgFormatter,
+ new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
+ new GridConnectionBytesVerifyFilter(log)
+ );
+
+ adapter.serve();
+ }
+ finally {
+ shmemWorkers.remove(this);
+
+ endpoint.close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ super.cancel();
+
+ endpoint.close();
+ }
+
+ /** @{@inheritDoc} */
+ @Override protected void cleanup() {
+ super.cleanup();
+
+ endpoint.close();
+ }
+
+ /** @{@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ShmemWorker.class, this);
+ }
+ }
+
+ /**
*
*/
private class CommunicationWorker extends IgniteSpiThread {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 6f5a738..fe4f581 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -44,6 +44,14 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getLocalPort();
/**
+ * Gets local port for shared memory communication.
+ *
+ * @return Port number.
+ */
+ @MXBeanDescription("Shared memory endpoint port number.")
+ public int getSharedMemoryPort();
+
+ /**
* Gets maximum number of local ports tried if all previously
* tried ports are occupied.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 9bfbd15..128d452 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -356,6 +356,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAllSkipStore() throws Exception {
+ IgniteCache<String, Integer> jcache = jcache();
+
+ jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3));
+
+ jcache.withSkipStore().removeAll();
+
+ assertEquals((Integer)1, jcache.get("1"));
+ assertEquals((Integer)2, jcache.get("2"));
+ assertEquals((Integer)3, jcache.get("3"));
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
public void testAtomicOps() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index db9e6a8..7905565 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -807,6 +807,25 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testGetOrCreateMultiNodeTemplate() throws Exception {
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx0 = idx.getAndIncrement();
+
+ ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME);
+
+ return null;
+ }
+ }, nodeCount() * 4, "runner");
+
+ ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testGetOrCreateNearOnlyMultiNode() throws Exception {
checkGetOrCreateNear(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
new file mode 100644
index 0000000..24ebb7c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private boolean clientDiscovery;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ if (!clientDiscovery)
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(SRVS);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyClients() throws Exception {
+ manyClientsPutGet();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyClientsClientDiscovery() throws Exception {
+ clientDiscovery = true;
+
+ manyClientsPutGet();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void manyClientsPutGet() throws Exception {
+ client = true;
+
+ final AtomicInteger idx = new AtomicInteger(SRVS);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final int THREADS = 30;
+
+ final CountDownLatch latch = new CountDownLatch(THREADS);
+
+ try {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Ignite ignite = startGrid(idx.getAndIncrement())) {
+ log.info("Started node: " + ignite.name());
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int iter = 0;
+
+ Integer key = rnd.nextInt(0, 1000);
+
+ cache.put(key, iter++);
+
+ assertNotNull(cache.get(key));
+
+ latch.countDown();
+
+ while (!stop.get()) {
+ key = rnd.nextInt(0, 1000);
+
+ cache.put(key, iter++);
+
+ assertNotNull(cache.get(key));
+ }
+
+ log.info("Stopping node: " + ignite.name());
+ }
+
+ return null;
+ }
+ }, THREADS, "client-thread");
+
+ latch.await();
+
+ Thread.sleep(10_000);
+
+ log.info("Stop clients.");
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 96abe5f..8031315 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -50,6 +50,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setSocketWriteTimeout(1000);
+ commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index 9d41074..4601586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
@IgniteInstanceResource
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 62bf3f7..cc8217d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
final int i0 = j;
final int j0 = i;
@@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
Object v1 = e.getValue();
Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
- assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 +
- ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
+ assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" +
+ key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
assertEquals(v1, v2);
}
catch (IgniteCheckedException e1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 068a46c..6ccfbc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
* @throws Exception If failed.
*/
public void testQuery() throws Exception {
- checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
- checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
}
/**
* @param cache Cache.
+ * @param scanPartitions Scan partitions.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void checkQuery(GridCacheAdapter cache) throws Exception {
+ private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
final int ENTRY_CNT = 500;
- for (int i = 0; i < ENTRY_CNT; i++)
- cache.getAndPut(new Key(i), new Person("p-" + i, i));
+ Map<Integer, Map<Key, Person>> entries = new HashMap<>();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Key key = new Key(i);
+ Person val = new Person("p-" + i, i);
+
+ int part = cache.context().affinity().partition(key);
+
+ cache.getAndPut(key, val);
+
+ Map<Key, Person> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(key, val);
+ }
try {
- CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
- new IgniteBiPredicate<Key, Person>() {
- @Override public boolean apply(Key key, Person p) {
- assertEquals(key.id, (Integer)p.salary);
+ int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
- return key.id % 2 == 0;
- }
- }, false);
+ for (int i = 0; i < partitions; i++) {
+ CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
+ new IgniteBiPredicate<Key, Person>() {
+ @Override public boolean apply(Key key, Person p) {
+ assertEquals(key.id, (Integer)p.salary);
- Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+ return key.id % 2 == 0;
+ }
+ }, (scanPartitions ? i : null), false);
- assertEquals(ENTRY_CNT / 2, res.size());
+ Collection<Map.Entry<Key, Person>> res = qry.execute().get();
- for (Map.Entry<Key, Person> e : res) {
- Key k = e.getKey();
- Person p = e.getValue();
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT / 2, res.size());
- assertEquals(k.id, (Integer)p.salary);
- assertEquals(0, k.id % 2);
- }
+ for (Map.Entry<Key, Person> e : res) {
+ Key k = e.getKey();
+ Person p = e.getValue();
- qry = cache.context().queries().createScanQuery(null, false);
+ assertEquals(k.id, (Integer)p.salary);
+ assertEquals(0, k.id % 2);
- res = qry.execute().get();
+ if (scanPartitions) {
+ Map<Key, Person> partEntries = entries.get(i);
- assertEquals(ENTRY_CNT, res.size());
+ assertEquals(p, partEntries.get(k));
+ }
+ }
+
+ qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+
+ res = qry.execute().get();
+
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT, res.size());
+ }
testMultithreaded(cache, ENTRY_CNT / 2);
}
@@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key.id % 2 == 0;
}
- }, false);
+ }, null, false);
for (int i = 0; i < 250; i++) {
Collection<Map.Entry<Key, Person>> res = qry.execute().get();
@@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return val % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<String, Long>> res = qry.execute().get();
@@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, val % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
@@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, key % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
this.name = name;
this.salary = salary;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Person person = (Person)o;
+
+ if (salary != person.salary)
+ return false;
+
+ return !(name != null ? !name.equals(person.name) : person.name != null);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+
+ return 31 * result + salary;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java
index 1a8fd10..e220031 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java
@@ -49,6 +49,8 @@ public class IgfsSharedMemoryTestServer {
srv.start();
+ System.out.println("IPC shared memory server endpoint started");
+
IpcEndpoint clientEndpoint = srv.accept();
is = clientEndpoint.inputStream();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 2ddf6f3..c6f590e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
index 7dc0870..4afb64b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
@@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
index 4c5413c..176429e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
@@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
index 8ff827b..8fee239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
@@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner {
createCorruptedLibFile();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
index 28495af..89eeda1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
@@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
index 2ade145..e8a8402 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 422d608..ea5b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -455,6 +455,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
spi.setTcpNoDelay(true);
spi.setConnectionBufferSize(0);
+ spi.setSharedMemoryPort(-1);
info("Comm SPI: " + spi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index ed9e0cf..744635d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -115,6 +115,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
commSpi.setLocalAddress("127.0.0.1");
commSpi.setLocalPort(commLocPort);
commSpi.setLocalPortRange(1);
+ commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 8d27485..eee38a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -37,10 +37,23 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
/** */
public static final int IDLE_CONN_TIMEOUT = 2000;
+ /** */
+ private final boolean useShmem;
+
+ /**
+ * @param useShmem Use shared mem flag.
+ */
+ protected GridTcpCommunicationSpiAbstractTest(boolean useShmem) {
+ this.useShmem = useShmem;
+ }
+
/** {@inheritDoc} */
@Override protected CommunicationSpi getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ if (!useShmem)
+ spi.setSharedMemoryPort(-1);
+
spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
spi.setTcpNoDelay(tcpNoDelay());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 2d175f5..a5cd7ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -181,8 +181,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
if (load) {
loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
+ @Override public Long call() throws Exception {
long dummyRes = 0;
List<String> list = new ArrayList<>();
@@ -300,6 +299,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
spi.setLocalPort(port++);
spi.setIdleConnectionTimeout(60_000);
spi.setConnectTimeout(10_000);
+ spi.setSharedMemoryPort(-1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 3916f02..f682f01 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -55,12 +55,14 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/** Message id sequence. */
private AtomicLong msgId = new AtomicLong();
+ /** */
+ private final boolean useShmem;
+
/** SPI resources. */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
/** SPIs */
- private static final Map<UUID, CommunicationSpi<Message>> spis =
- new ConcurrentHashMap<>();
+ private static final Map<UUID, CommunicationSpi<Message>> spis = new ConcurrentHashMap<>();
/** Listeners. */
private static final Map<UUID, MessageListener> lsnrs = new HashMap<>();
@@ -80,9 +82,19 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
}
/**
+ * @param useShmem Use shared mem.
*/
- public GridTcpCommunicationSpiMultithreadedSelfTest() {
+ protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
super(false);
+
+ this.useShmem = useShmem;
+ }
+
+ /**
+ *
+ */
+ public GridTcpCommunicationSpiMultithreadedSelfTest() {
+ this(false);
}
/**
@@ -413,6 +425,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
private CommunicationSpi<Message> newCommunicationSpi() {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ if (!useShmem)
+ spi.setSharedMemoryPort(-1);
+
spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
new file mode 100644
index 0000000..590b426
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest {
+ /** */
+ public GridTcpCommunicationSpiMultithreadedShmemTest() {
+ super(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index c0f0b11..1a4ba22 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -324,6 +324,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi.setTcpNoDelay(true);
spi.setAckSendThreshold(ackCnt);
spi.setMessageQueueLimit(queueLimit);
+ spi.setSharedMemoryPort(-1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 7463388..5d3afd9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -608,6 +608,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
protected TcpCommunicationSpi getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ spi.setSharedMemoryPort(-1);
spi.setLocalPort(port++);
spi.setIdleConnectionTimeout(10_000);
spi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
new file mode 100644
index 0000000..5746a3c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.testframework.junits.spi.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiShmemSelfTest extends GridTcpCommunicationSpiAbstractTest {
+ /**
+ *
+ */
+ public GridTcpCommunicationSpiShmemSelfTest() {
+ super(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean tcpNoDelay() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
index 32bced2..c27a86f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
@@ -24,6 +24,13 @@ import org.apache.ignite.testframework.junits.spi.*;
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
public class GridTcpCommunicationSpiTcpSelfTest extends GridTcpCommunicationSpiAbstractTest {
+ /**
+ *
+ */
+ public GridTcpCommunicationSpiTcpSelfTest() {
+ super(false);
+ }
+
/** {@inheritDoc} */
@Override protected boolean tcpNoDelay() {
return true;