You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/05 04:19:54 UTC
[01/29] incubator-ignite git commit: # ignite-970
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-389-ipc 7ee51ba05 -> 6b51f99e7
# ignite-970
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/104a13fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/104a13fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/104a13fd
Branch: refs/heads/ignite-389-ipc
Commit: 104a13fd2118804e42b5035df4340d7374c36e82
Parents: 1dbdd42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 14:25:08 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 14:25:08 2015 +0300
----------------------------------------------------------------------
.../util/nio/GridShmemCommunicationClient.java | 151 +++++++
.../communication/tcp/TcpCommunicationSpi.java | 414 ++++++++++++++++++-
.../tcp/TcpCommunicationSpiMBean.java | 8 +
.../IgniteCacheMessageRecoveryAbstractTest.java | 1 +
.../spi/GridTcpSpiForwardingSelfTest.java | 1 +
.../GridTcpCommunicationSpiAbstractTest.java | 13 +
...mmunicationSpiConcurrentConnectSelfTest.java | 4 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 11 +-
...pCommunicationSpiMultithreadedShmemTest.java | 28 ++
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 1 +
.../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++
.../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 +
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 +
...oopSecondaryFileSystemConfigurationTest.java | 14 +
.../hadoop/HadoopAbstractSelfTest.java | 6 +
17 files changed, 695 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
new file mode 100644
index 0000000..f3dc46f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridShmemCommunicationClient extends GridAbstractCommunicationClient {
+ /** */
+ private final IpcSharedMemoryClientEndpoint shmem;
+
+ /** */
+ private final ByteBuffer writeBuf;
+
+ /** */
+ private final MessageFormatter formatter;
+
+ /**
+ * @param metricsLsnr Metrics listener.
+ * @param port Shared memory IPC server port.
+ * @param connTimeout Connection timeout.
+ * @param log Logger.
+ * @param formatter Message formatter.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+ int port,
+ long connTimeout,
+ IgniteLogger log,
+ MessageFormatter formatter)
+ throws IgniteCheckedException
+ {
+ super(metricsLsnr);
+
+ assert metricsLsnr != null;
+ assert port > 0 && port < 0xffff;
+ assert connTimeout >= 0;
+
+ shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
+
+ writeBuf = ByteBuffer.allocate(8 << 10);
+
+ writeBuf.order(ByteOrder.nativeOrder());
+
+ this.formatter = formatter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
+ throws IgniteCheckedException {
+ handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean close() {
+ boolean res = super.close();
+
+ if (res)
+ shmem.close();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forceClose() {
+ super.forceClose();
+
+ // Do not call forceClose() here.
+ shmem.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Communication client was closed: " + this);
+
+ try {
+ shmem.outputStream().write(data, 0, len);
+
+ metricsLsnr.onBytesSent(len);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+ }
+
+ markUsed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg)
+ throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Communication client was closed: " + this);
+
+ assert writeBuf.hasArray();
+
+ try {
+ int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+
+ metricsLsnr.onBytesSent(cnt);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+ }
+
+ markUsed();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flushIfNeeded(long timeout) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridShmemCommunicationClient.class, this, super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 19e54c8..3768db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -25,15 +25,19 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -139,6 +143,10 @@ import static org.apache.ignite.events.EventType.*;
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter
implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+ /** IPC error message. */
+ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
+ "(switching to TCP, may be slower).";
+
/** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
public static final String ATTR_ADDRS = "comm.tcp.addrs";
@@ -148,12 +156,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
public static final String ATTR_PORT = "comm.tcp.port";
+ /** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
+ public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
+
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
+ /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
+ public static final int DFLT_SHMEM_PORT = 48100;
+
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
@@ -293,7 +307,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert ses.accepted();
if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
else {
assert msg instanceof HandshakeMessage : msg;
@@ -322,6 +336,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient oldClient = clients.get(sndId);
+ boolean hasShmemClient = false;
+
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
@@ -333,6 +349,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
}
GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
@@ -359,10 +380,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
}
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (log.isDebugEnabled())
log.debug("Received incoming connection from remote node " +
@@ -371,7 +397,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (reserved) {
try {
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
@@ -393,11 +419,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
else {
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (reserved) {
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true);
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
@@ -465,6 +491,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param node Node.
* @param rcvCnt Number of received messages..
* @param sndRes If {@code true} sends response for recovery handshake.
+ * @param createClient If {@code true} creates NIO communication client.
* @return Client.
*/
private GridTcpNioCommunicationClient connected(
@@ -472,7 +499,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioSession ses,
ClusterNode node,
long rcvCnt,
- boolean sndRes) {
+ boolean sndRes,
+ boolean createClient) {
recovery.onHandshake(rcvCnt);
ses.recoveryDescriptor(recovery);
@@ -484,12 +512,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
recovery.connected();
- GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log);
+ GridTcpNioCommunicationClient client = null;
+
+ if (createClient) {
+ client = new GridTcpNioCommunicationClient(ses, log);
- GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+ GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
- assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
+ assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+ }
return client;
}
@@ -517,22 +549,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final GridFutureAdapter<GridCommunicationClient> fut;
+ /** */
+ private final boolean createClient;
+
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
* @param msg Handshake message.
+ * @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
*/
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
HandshakeMessage msg,
+ boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
this.msg = msg;
+ this.createClient = createClient;
this.fut = fut;
}
@@ -545,7 +583,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
msgFut.get();
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg.received(), false);
+ connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
fut.onDone(client);
}
@@ -594,6 +632,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Local port range. */
private int locPortRange = DFLT_PORT_RANGE;
+ /** Local port which node uses to accept shared memory connections. */
+ private int shmemPort = DFLT_SHMEM_PORT;
+
/** Allocate direct buffer or heap buffer. */
private boolean directBuf = true;
@@ -635,6 +676,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** NIO server. */
private GridNioServer<Message> nioSrvr;
+ /** Shared memory server. */
+ private IpcSharedMemoryServerEndpoint shmemSrv;
+
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
@@ -647,6 +691,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
+ /** Shared memory accept worker. */
+ private ShmemAcceptWorker shmemAcceptWorker;
+
/** Idle client worker. */
private IdleClientWorker idleClientWorker;
@@ -659,6 +706,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Recovery worker. */
private RecoveryWorker recoveryWorker;
+ /** Shared memory workers. */
+ private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
+
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -668,6 +718,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Bound port. */
private int boundTcpPort = -1;
+ /** Bound port for shared memory server. */
+ private int boundTcpShmemPort = -1;
+
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
@@ -811,6 +864,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Sets local port to accept shared memory connections.
+ * <p>
+ * If set to {@code -1} shared memory communication will be disabled.
+ * <p>
+ * If not provided, default value is {@link #DFLT_SHMEM_PORT}.
+ *
+ * @param shmemPort Port number.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setSharedMemoryPort(int shmemPort) {
+ this.shmemPort = shmemPort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSharedMemoryPort() {
+ return shmemPort;
+ }
+
+ /**
* Sets maximum idle connection timeout upon which a connection
* to client will be closed.
* <p>
@@ -1179,6 +1251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
+ assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
assertParameter(minBufferedMsgCnt >= 0, "minBufferedMsgCnt >= 0");
@@ -1204,6 +1277,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
+ shmemSrv = resetShmemServer();
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to start shared memory communication server.", e);
+ }
+
+ try {
// This method potentially resets local port to the value
// local node was bound to.
nioSrvr = resetNioServer();
@@ -1223,6 +1303,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
createSpiAttributeName(ATTR_ADDRS), addrs.get1(),
createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
createSpiAttributeName(ATTR_PORT), boundTcpPort,
+ createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null,
createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
}
catch (IOException | IgniteCheckedException e) {
@@ -1251,6 +1332,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
+ log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt));
log.debug(configInfo("connTimeout", connTimeout));
@@ -1272,6 +1354,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
+ if (shmemSrv != null) {
+ shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
+
+ new IgniteThread(shmemAcceptWorker).start();
+ }
+
nioSrvr.start();
idleClientWorker = new IdleClientWorker();
@@ -1301,6 +1389,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);
+ // SPI can start without shmem port.
+ if (boundTcpShmemPort > 0)
+ spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
+
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
@@ -1341,7 +1433,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// If configured TCP port is busy, find first available in range.
for (int port = locPort; port < locPort + locPortRange; port++) {
try {
- MessageFactory messageFactory = new MessageFactory() {
+ MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(byte type) {
@@ -1354,7 +1446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- MessageFormatter messageFormatter = new MessageFormatter() {
+ MessageFormatter msgFormatter = new MessageFormatter() {
private MessageFormatter impl;
@Override public MessageWriter writer() {
@@ -1376,7 +1468,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter);
+ GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
@@ -1403,7 +1495,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.writeTimeout(sockWriteTimeout)
.filters(new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log))
- .messageFormatter(messageFormatter)
+ .messageFormatter(msgFormatter)
.skipRecoveryPredicate(skipRecoveryPred)
.build();
@@ -1435,6 +1527,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
+ /**
+ * Creates new shared memory communication server.
+ * @return Server.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
+ if (boundTcpShmemPort >= 0)
+ throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
+
+ if (shmemPort == -1 || U.isWindows())
+ return null;
+
+ IgniteCheckedException lastEx = null;
+
+ // If configured TCP port is busy, find first available in range.
+ for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
+ try {
+ IpcSharedMemoryServerEndpoint srv =
+ new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName);
+
+ srv.setPort(port);
+
+ srv.omitOutOfResourcesWarning(true);
+
+ srv.start();
+
+ boundTcpShmemPort = port;
+
+ // Ack Port the TCP server was bound to.
+ if (log.isInfoEnabled())
+ log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
+ ", locHost=" + locHost + ']');
+
+ return srv;
+ }
+ catch (IgniteCheckedException e) {
+ lastEx = e;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
+ ", locHost=" + locHost + ']');
+ }
+ }
+
+ // If free port wasn't found.
+ throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
+ locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
+ }
+
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
assert isNodeStopping();
@@ -1445,6 +1586,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
+ U.cancel(shmemAcceptWorker);
+ U.join(shmemAcceptWorker, log);
+
U.interrupt(idleClientWorker);
U.interrupt(clientFlushWorker);
U.interrupt(sockTimeoutWorker);
@@ -1455,6 +1599,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.join(sockTimeoutWorker, log);
U.join(recoveryWorker, log);
+ U.cancel(shmemWorkers);
+ U.join(shmemWorkers, log);
+
+ shmemWorkers.clear();
+
// Force closing on stop (safety).
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -1665,13 +1814,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
assert node != null;
- if (getSpiContext().localNode() == null)
+ Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
+
+ ClusterNode locNode = getSpiContext().localNode();
+
+ if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+ // If remote node has shared memory server enabled and has the same set of MACs
+ // then we are likely to run on the same host and shared memory communication could be tried.
+ if (shmemPort != null && U.sameMacs(locNode, node)) {
+ try {
+ return createShmemClient(node, shmemPort);
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(IpcOutOfSystemResourcesException.class))
+ // Has cause or is itself the IpcOutOfSystemResourcesException.
+ LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
+ else if (getSpiContext().node(node.id()) != null)
+ LT.warn(log, null, e.getMessage());
+ else if (log.isDebugEnabled())
+ log.debug("Failed to establish shared memory connection with local node (node has left): " +
+ node.id());
+ }
+ }
+
return createTcpClient(node);
}
/**
+ * @param node Node.
+ * @param port Port.
+ * @return Client.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
+ int attempt = 1;
+
+ int connectAttempts = 1;
+
+ long connTimeout0 = connTimeout;
+
+ while (true) {
+ GridCommunicationClient client;
+
+ try {
+ client = new GridShmemCommunicationClient(metricsLsnr,
+ port,
+ connTimeout,
+ log,
+ getSpiContext().messageFormatter());
+ }
+ catch (IgniteCheckedException e) {
+ // Reconnect for the second time, if connection is not established.
+ if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
+ connectAttempts++;
+
+ continue;
+ }
+
+ throw e;
+ }
+
+ try {
+ safeHandshake(client, null, node.id(), connTimeout0);
+ }
+ catch (HandshakeTimeoutException e) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ client.forceClose();
+
+ if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+ "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+ ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+ ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ throw e;
+ }
+ else {
+ attempt++;
+
+ connTimeout0 *= 2;
+
+ continue;
+ }
+ }
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ if (log.isDebugEnabled())
+ log.debug(
+ "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
+
+ client.forceClose();
+
+ throw e;
+ }
+
+ return client;
+ }
+ }
+
+ /**
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
@@ -2154,6 +2400,144 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * This worker takes responsibility to shut the server down when stopping,
+ * No other thread shall stop passed server.
+ */
+ private class ShmemAcceptWorker extends GridWorker {
+ /** */
+ private final IpcSharedMemoryServerEndpoint srv;
+
+ /**
+ * @param srv Server.
+ */
+ ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
+ super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
+
+ this.srv = srv;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ while (!Thread.interrupted()) {
+ ShmemWorker e = new ShmemWorker(srv.accept());
+
+ shmemWorkers.add(e);
+
+ new IgniteThread(e).start();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (!isCancelled())
+ U.error(log, "Shmem server failed.", e);
+ }
+ finally {
+ srv.close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ super.cancel();
+
+ srv.close();
+ }
+ }
+
+ /**
+ *
+ */
+ private class ShmemWorker extends GridWorker {
+ /** */
+ private final IpcEndpoint endpoint;
+
+ /**
+ * @param endpoint Endpoint.
+ */
+ private ShmemWorker(IpcEndpoint endpoint) {
+ super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
+
+ this.endpoint = endpoint;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ MessageFactory msgFactory = new MessageFactory() {
+ private MessageFactory impl;
+
+ @Nullable @Override public Message create(byte type) {
+ if (impl == null)
+ impl = getSpiContext().messageFactory();
+
+ assert impl != null;
+
+ return impl.create(type);
+ }
+ };
+
+ MessageFormatter msgFormatter = new MessageFormatter() {
+ private MessageFormatter impl;
+
+ @Override public MessageWriter writer() {
+ if (impl == null)
+ impl = getSpiContext().messageFormatter();
+
+ assert impl != null;
+
+ return impl.writer();
+ }
+
+ @Override public MessageReader reader(MessageFactory factory) {
+ if (impl == null)
+ impl = getSpiContext().messageFormatter();
+
+ assert impl != null;
+
+ return impl.reader(factory);
+ }
+ };
+
+ IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
+ metricsLsnr,
+ log,
+ endpoint,
+ srvLsnr,
+ msgFormatter,
+ new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
+ new GridConnectionBytesVerifyFilter(log)
+ );
+
+ adapter.serve();
+ }
+ finally {
+ shmemWorkers.remove(this);
+
+ endpoint.close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ super.cancel();
+
+ endpoint.close();
+ }
+
+ /** @{@inheritDoc} */
+ @Override protected void cleanup() {
+ super.cleanup();
+
+ endpoint.close();
+ }
+
+ /** @{@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ShmemWorker.class, this);
+ }
+ }
+
+ /**
*
*/
private class IdleClientWorker extends IgniteSpiThread {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..3c6b64e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -44,6 +44,14 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getLocalPort();
/**
+ * Gets local port for shared memory communication.
+ *
+ * @return Port number.
+ */
+ @MXBeanDescription("Shared memory endpoint port number.")
+ public int getSharedMemoryPort();
+
+ /**
* Gets maximum number of local ports tried if all previously
* tried ports are occupied.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 96abe5f..8031315 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -50,6 +50,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setSocketWriteTimeout(1000);
+ commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index ed9e0cf..744635d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -115,6 +115,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
commSpi.setLocalAddress("127.0.0.1");
commSpi.setLocalPort(commLocPort);
commSpi.setLocalPortRange(1);
+ commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index ea51aff..2d3f506 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -37,10 +37,23 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
/** */
public static final int IDLE_CONN_TIMEOUT = 2000;
+ /** */
+ private final boolean useShmem;
+
+ /**
+ * @param useShmem Use shared mem flag.
+ */
+ protected GridTcpCommunicationSpiAbstractTest(boolean useShmem) {
+ this.useShmem = useShmem;
+ }
+
/** {@inheritDoc} */
@Override protected CommunicationSpi getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ if (!useShmem)
+ spi.setSharedMemoryPort(-1);
+
spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
spi.setTcpNoDelay(tcpNoDelay());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c038ee7..26e1120 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -181,8 +181,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
if (load) {
loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
+ @Override public Long call() throws Exception {
long dummyRes = 0;
List<String> list = new ArrayList<>();
@@ -300,6 +299,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
spi.setLocalPort(port++);
spi.setIdleConnectionTimeout(60_000);
spi.setConnectTimeout(10_000);
+ spi.setSharedMemoryPort(-1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index e7ae957..9909d76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -55,6 +55,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/** Message id sequence. */
private AtomicLong msgId = new AtomicLong();
+ /** */
+ private final boolean useShmem;
+
/** SPI resources. */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -80,9 +83,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
}
/**
+ * @param useShmem Use shared mem.
*/
- public GridTcpCommunicationSpiMultithreadedSelfTest() {
+ protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
super(false);
+
+ this.useShmem = useShmem;
}
/**
@@ -413,6 +419,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
private CommunicationSpi<Message> newCommunicationSpi() {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ if (!useShmem)
+ spi.setSharedMemoryPort(-1);
+
spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
new file mode 100644
index 0000000..590b426
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest {
+ /** */
+ public GridTcpCommunicationSpiMultithreadedShmemTest() {
+ super(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index c0f0b11..1a4ba22 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -324,6 +324,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi.setTcpNoDelay(true);
spi.setAckSendThreshold(ackCnt);
spi.setMessageQueueLimit(queueLimit);
+ spi.setSharedMemoryPort(-1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 7463388..5d3afd9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -608,6 +608,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
protected TcpCommunicationSpi getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
+ spi.setSharedMemoryPort(-1);
spi.setLocalPort(port++);
spi.setIdleConnectionTimeout(10_000);
spi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
new file mode 100644
index 0000000..5746a3c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.testframework.junits.spi.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiShmemSelfTest extends GridTcpCommunicationSpiAbstractTest {
+ /**
+ *
+ */
+ public GridTcpCommunicationSpiShmemSelfTest() {
+ super(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean tcpNoDelay() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
index 32bced2..c27a86f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java
@@ -24,6 +24,13 @@ import org.apache.ignite.testframework.junits.spi.*;
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
public class GridTcpCommunicationSpiTcpSelfTest extends GridTcpCommunicationSpiAbstractTest {
+ /**
+ *
+ */
+ public GridTcpCommunicationSpiTcpSelfTest() {
+ super(false);
+ }
+
/** {@inheritDoc} */
@Override protected boolean tcpNoDelay() {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1d3bfcd..ff86bda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -38,10 +38,12 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpNoDelayOffSelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiShmemSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiStartStopSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index 9bcd5de..a1535ed 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -186,6 +188,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
cfg.setFileSystemConfiguration(igfsCfg);
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
cfg.setLocalHost(U.getLocalHost().getHostAddress());
+ cfg.setCommunicationSpi(communicationSpi());
G.start(cfg);
}
@@ -211,6 +214,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
cfg.setFileSystemConfiguration(igfsConfiguration(gridName));
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
cfg.setLocalHost("127.0.0.1");
+ cfg.setCommunicationSpi(communicationSpi());
return cfg;
}
@@ -270,6 +274,15 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
return cfg;
}
+ /** @return Communication SPI. */
+ private CommunicationSpi communicationSpi() {
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ return commSpi;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
G.stopAll(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index b089995..8c33679 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.processors.hadoop.igfs.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -279,6 +281,8 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
cfg.setFileSystemConfiguration(igfsCfg);
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+ cfg.setCommunicationSpi(communicationSpi());
+
G.start(cfg);
}
@@ -314,6 +318,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
cfg.setCacheConfiguration(cacheConfiguration());
cfg.setFileSystemConfiguration(fsConfiguration(gridName));
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+ cfg.setCommunicationSpi(communicationSpi());
return cfg;
}
@@ -371,6 +376,15 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
return cfg;
}
+ /** @return Communication SPI. */
+ private CommunicationSpi communicationSpi() {
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ return commSpi;
+ }
+
/**
* Case #SecondaryFileSystemProvider(null, path)
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..7fda532 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -94,6 +94,12 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
discoSpi.setIpFinder(IP_FINDER);
[07/29] incubator-ignite git commit: ignite-989 Fix multicast for
client discovery
Posted by ag...@apache.org.
ignite-989 Fix multicast for client discovery
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9ad3617d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9ad3617d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9ad3617d
Branch: refs/heads/ignite-389-ipc
Commit: 9ad3617d39f02eb98ef01f332e2aac20f97870c2
Parents: 97d0bc1
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 3 12:06:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 12:11:26 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 -----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 56 +++++++-
.../tcp/ipfinder/TcpDiscoveryIpFinder.java | 10 +-
.../TcpDiscoveryMulticastIpFinder.java | 47 +++++--
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 43 ++++---
8 files changed, 244 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e672d64..6a5b644 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -166,6 +166,9 @@ class ClientImpl extends TcpDiscoveryImpl {
msgWorker = new MessageWorker();
msgWorker.start();
+ if (spi.ipFinder.isShared())
+ registerLocalNodeAddress();
+
try {
joinLatch.await();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 57c13d6..698835f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -272,37 +272,6 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.printStartInfo();
}
- /**
- * @throws IgniteSpiException If failed.
- */
- @SuppressWarnings("BusyWait")
- private void registerLocalNodeAddress() throws IgniteSpiException {
- // Make sure address registration succeeded.
- while (true) {
- try {
- spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
-
- // Success.
- break;
- }
- catch (IllegalStateException e) {
- throw new IgniteSpiException("Failed to register local node address with IP finder: " +
- locNode.socketAddresses(), e);
- }
- catch (IgniteSpiException e) {
- LT.error(log, e, "Failed to register local node address in IP finder on start " +
- "(retrying every 2000 ms).");
- }
-
- try {
- U.sleep(2000);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
- }
-
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(tcpSrvr.port, TCP);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f285279..b7e9e53 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
@@ -58,7 +60,7 @@ abstract class TcpDiscoveryImpl {
}
/**
- *
+ * @return Local node ID.
*/
public UUID getLocalNodeId() {
return spi.getLocalNodeId();
@@ -78,42 +80,47 @@ abstract class TcpDiscoveryImpl {
public abstract void dumpDebugInfo(IgniteLogger log);
/**
- *
+ * @return SPI state string.
*/
public abstract String getSpiState();
/**
- *
+ * @return Message worker queue current size.
*/
public abstract int getMessageWorkerQueueSize();
/**
- *
+ * @return Coordinator ID.
*/
public abstract UUID getCoordinator();
/**
- *
+ * @return Collection of remote nodes.
*/
public abstract Collection<ClusterNode> getRemoteNodes();
/**
* @param nodeId Node id.
+ * @return Node with given ID or {@code null} if node is not found.
*/
@Nullable public abstract ClusterNode getNode(UUID nodeId);
/**
* @param nodeId Node id.
+ * @return {@code true} if node alive, {@code false} otherwise.
*/
public abstract boolean pingNode(UUID nodeId);
/**
+ * Tells discovery SPI to disconnect from topology.
*
+ * @throws IgniteSpiException If failed.
*/
public abstract void disconnect() throws IgniteSpiException;
/**
* @param msg Message.
+ * @throws IgniteException If failed.
*/
public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
@@ -124,16 +131,18 @@ abstract class TcpDiscoveryImpl {
/**
* @param gridName Grid name.
+ * @throws IgniteSpiException If failed.
*/
public abstract void spiStart(@Nullable String gridName) throws IgniteSpiException;
/**
- *
+ * @throws IgniteSpiException If failed.
*/
public abstract void spiStop() throws IgniteSpiException;
/**
* @param spiCtx Spi context.
+ * @throws IgniteSpiException If failed.
*/
public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;
@@ -164,7 +173,40 @@ abstract class TcpDiscoveryImpl {
public abstract void brakeConnection();
/**
- * FOR TEST PURPOSE ONLY!
+ * <strong>FOR TEST ONLY!!!</strong>
+ *
+ * @return Worker thread.
*/
protected abstract IgniteSpiThread workerThread();
+
+ /**
+ * @throws IgniteSpiException If failed.
+ */
+ @SuppressWarnings("BusyWait")
+ protected final void registerLocalNodeAddress() throws IgniteSpiException {
+ // Make sure address registration succeeded.
+ while (true) {
+ try {
+ spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+ // Success.
+ break;
+ }
+ catch (IllegalStateException e) {
+ throw new IgniteSpiException("Failed to register local node address with IP finder: " +
+ locNode.socketAddresses(), e);
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to register local node address in IP finder on start " +
+ "(retrying every 2000 ms).");
+ }
+
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
index 51ad7b4..95758e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
@@ -31,7 +31,7 @@ public interface TcpDiscoveryIpFinder {
* method is completed, SPI context can be stored for future access.
*
* @param spiCtx Spi context.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException;
@@ -46,7 +46,7 @@ public interface TcpDiscoveryIpFinder {
* Initializes addresses discovery SPI binds to.
*
* @param addrs Addresses discovery SPI binds to.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
@@ -54,7 +54,7 @@ public interface TcpDiscoveryIpFinder {
* Gets all addresses registered in this finder.
*
* @return All known addresses, potentially empty, but never {@code null}.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException;
@@ -76,7 +76,7 @@ public interface TcpDiscoveryIpFinder {
* is already registered.
*
* @param addrs Addresses to register. Not {@code null} and not empty.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
@@ -87,7 +87,7 @@ public interface TcpDiscoveryIpFinder {
* registered quietly (just no-op).
*
* @param addrs Addresses to unregister. Not {@code null} and not empty.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 45d0816..a992620 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -26,6 +26,8 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.jetbrains.annotations.*;
@@ -254,6 +256,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
"(it is recommended in production to specify at least one address in " +
"TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
+ boolean clientMode;
+
+ if (ignite != null) { // Can be null if used in tests without starting Ignite.
+ DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
+
+ if (!(discoSpi instanceof TcpDiscoverySpi))
+ throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
+ "TcpDiscoverySpi: " + discoSpi);
+
+ clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
+ }
+ else
+ clientMode = false;
+
InetAddress mcastAddr;
try {
@@ -296,7 +312,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
if (!addr.isLoopbackAddress()) {
try {
- addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
+ if (!clientMode)
+ addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
reqItfs.add(addr);
}
@@ -309,20 +326,24 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
}
- if (addrSnds.isEmpty()) {
- try {
- // Create non-bound socket if local host is loopback or failed to create sockets explicitly
- // bound to interfaces.
- addrSnds.add(new AddressSender(mcastAddr, null, addrs));
- }
- catch (IOException e) {
- throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
- ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+ if (!clientMode) {
+ if (addrSnds.isEmpty()) {
+ try {
+ // Create non-bound socket if local host is loopback or failed to create sockets explicitly
+ // bound to interfaces.
+ addrSnds.add(new AddressSender(mcastAddr, null, addrs));
+ }
+ catch (IOException e) {
+ throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
+ ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+ }
}
- }
- for (AddressSender addrSnd :addrSnds)
- addrSnd.start();
+ for (AddressSender addrSnd : addrSnds)
+ addrSnd.start();
+ }
+ else
+ assert addrSnds.isEmpty() : addrSnds;
Collection<InetSocketAddress> ret;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
new file mode 100644
index 0000000..d1b6232
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
+ /** */
+ private boolean forceSrv;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(new TcpDiscoveryMulticastIpFinder());
+
+ if (getTestGridName(1).equals(gridName)) {
+ cfg.setClientMode(true);
+
+ spi.setForceServerMode(forceSrv);
+ }
+
+ cfg.setDiscoverySpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinWithMulticast() throws Exception {
+ joinWithMulticast();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinWithMulticastForceServer() throws Exception {
+ forceSrv = true;
+
+ joinWithMulticast();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void joinWithMulticast() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ assertSpi(ignite0, false);
+
+ Ignite ignite1 = startGrid(1);
+
+ assertSpi(ignite1, !forceSrv);
+
+ assertTrue(ignite1.configuration().isClientMode());
+
+ assertEquals(2, ignite0.cluster().nodes().size());
+ assertEquals(2, ignite1.cluster().nodes().size());
+
+ Ignite ignite2 = startGrid(2);
+
+ assertSpi(ignite2, false);
+
+ assertEquals(3, ignite0.cluster().nodes().size());
+ assertEquals(3, ignite1.cluster().nodes().size());
+ assertEquals(3, ignite2.cluster().nodes().size());
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param client Expected client mode flag.
+ */
+ private void assertSpi(Ignite ignite, boolean client) {
+ DiscoverySpi spi = ignite.configuration().getDiscoverySpi();
+
+ assertSame(TcpDiscoverySpi.class, spi.getClass());
+
+ TcpDiscoverySpi spi0 = (TcpDiscoverySpi)spi;
+
+ assertSame(TcpDiscoveryMulticastIpFinder.class, spi0.getIpFinder().getClass());
+
+ assertEquals(client, spi0.isClientMode());
+
+ Collection<Object> addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds");
+
+ assertNotNull(addrSnds);
+
+ if (client)
+ assertTrue(addrSnds.isEmpty()); // Check client does not send its address.
+ else
+ assertFalse(addrSnds.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index dc35b24..ea5a7ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
+ suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
return suite;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9ad3617d/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
index b496f60..48991e8 100644
--- a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
+++ b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
@@ -68,34 +68,37 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
* Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
*/
public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapter {
- /* Default object's content. */
+ /** Default object's content. */
private final static ByteArrayInputStream OBJECT_CONTENT = new ByteArrayInputStream(new byte[0]);
/** Grid logger. */
@LoggerResource
private IgniteLogger log;
- /* Google Cloud Platform's project name.*/
+ /** Google Cloud Platform's project name.*/
private String projectName;
- /* Google Storage bucket name. */
+ /** Google Storage bucket name. */
private String bucketName;
- /* Service account p12 private key file name. */
- private String serviceAccountP12FilePath;
+ /** Service account p12 private key file name. */
+ private String srvcAccountP12FilePath;
- /* Service account id. */
- private String serviceAccountId;
+ /** Service account id. */
+ private String srvcAccountId;
- /* Google storage. */
+ /** Google storage. */
private Storage storage;
- /* Init routine guard. */
+ /** Init routine guard. */
private final AtomicBoolean initGuard = new AtomicBoolean();
- /* Init routine latch. */
+ /** Init routine latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
+ /**
+ *
+ */
public TcpDiscoveryGoogleStorageIpFinder() {
setShared(true);
}
@@ -221,7 +224,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
*/
@IgniteSpiConfiguration(optional = false)
public void setServiceAccountP12FilePath(String p12FileName) {
- this.serviceAccountP12FilePath = p12FileName;
+ this.srvcAccountP12FilePath = p12FileName;
}
/**
@@ -235,7 +238,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
*/
@IgniteSpiConfiguration(optional = false)
public void setServiceAccountId(String id) {
- this.serviceAccountId = id;
+ this.srvcAccountId = id;
}
/**
@@ -245,13 +248,13 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
*/
private void init() throws IgniteSpiException {
if (initGuard.compareAndSet(false, true)) {
- if (serviceAccountId == null ||
- serviceAccountP12FilePath == null ||
+ if (srvcAccountId == null ||
+ srvcAccountP12FilePath == null ||
projectName == null ||
bucketName == null) {
throw new IgniteSpiException(
"One or more of the required parameters is not set [serviceAccountId=" +
- serviceAccountId + ", serviceAccountP12FilePath=" + serviceAccountP12FilePath + ", projectName=" +
+ srvcAccountId + ", serviceAccountP12FilePath=" + srvcAccountP12FilePath + ", projectName=" +
projectName + ", bucketName=" + bucketName + "]");
}
@@ -265,12 +268,12 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
throw new IgniteSpiException(e);
}
- GoogleCredential credential;
+ GoogleCredential cred;
try {
- credential = new GoogleCredential.Builder().setTransport(httpTransport)
- .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId)
- .setServiceAccountPrivateKeyFromP12File(new File(serviceAccountP12FilePath))
+ cred = new GoogleCredential.Builder().setTransport(httpTransport)
+ .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(srvcAccountId)
+ .setServiceAccountPrivateKeyFromP12File(new File(srvcAccountP12FilePath))
.setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL)).build();
}
@@ -279,7 +282,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
}
try {
- storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), credential)
+ storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), cred)
.setApplicationName(projectName).build();
}
catch (Exception e) {
[06/29] incubator-ignite git commit: IGNITE-983: Added tests.
Posted by ag...@apache.org.
IGNITE-983: Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bafad996
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bafad996
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bafad996
Branch: refs/heads/ignite-389-ipc
Commit: bafad99677ff63431adfc9ca9c3e3a3897447c25
Parents: 11c0b90
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 15:25:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 15:25:49 2015 +0700
----------------------------------------------------------------------
...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++++++++++++++
.../IgniteCacheWithIndexingTestSuite.java | 2 +
2 files changed, 106 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
new file mode 100644
index 0000000..967a466
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimitiveTypes() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("c1");
+
+ ccfg.setIndexedTypes(
+ byte.class, byte.class,
+ short.class, short.class,
+ int.class, int.class,
+ long.class, long.class,
+ float.class, float.class,
+ double.class, double.class,
+ boolean.class, boolean.class);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg);
+
+ byte b = 1;
+ cache.put(b, b);
+
+ short s = 2;
+ cache.put(s, s);
+
+ int i = 3;
+ cache.put(i, i);
+
+ long l = 4;
+ cache.put(l, l);
+
+ float f = 5;
+ cache.put(f, f);
+
+ double d = 6;
+ cache.put(d, d);
+
+ boolean bool = true;
+ cache.put(bool, bool);
+
+ assert cache.query(new ScanQuery<>()).getAll().size() == 7;
+
+ assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 240caff..67ebda9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -51,6 +51,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(CacheConfigurationP2PTest.class);
+ suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+
return suite;
}
}
[17/29] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-981' into ignite-sprint-5
Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae5189ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae5189ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae5189ab
Branch: refs/heads/ignite-389-ipc
Commit: ae5189ab9513120258add9769e8cfc88e1c6aad8
Parents: 1b12bb4 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:42:36 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:42:36 2015 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionTopologyImpl.java | 8 ++-
.../GridDhtPartitionsExchangeFuture.java | 10 +++-
.../cache/IgniteDynamicCacheStartSelfTest.java | 62 ++++++++++++++++++++
3 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[22/29] incubator-ignite git commit: # IGNITE-983. Minor fix after
review.
Posted by ag...@apache.org.
# IGNITE-983. Minor fix after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bf3203a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bf3203a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bf3203a4
Branch: refs/heads/ignite-389-ipc
Commit: bf3203a42fbd92e5960c29f672351d20cd756897
Parents: 46b2447
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:50:16 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:50:16 2015 +0700
----------------------------------------------------------------------
...gniteCacheConfigurationPrimitiveTypesSelfTest.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf3203a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
index 967a466..e90f10c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -93,12 +93,12 @@ public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAb
assert cache.query(new ScanQuery<>()).getAll().size() == 7;
- assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+ assertEquals(cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size(), 1);
}
}
[10/29] incubator-ignite git commit: Merge branch 'ignite-970' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389-ipc
Posted by ag...@apache.org.
Merge branch 'ignite-970' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389-ipc
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a329e901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a329e901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a329e901
Branch: refs/heads/ignite-389-ipc
Commit: a329e901d47419d5ab5e1db55dee6d2001f9d66e
Parents: 7ee51ba 7158fb6
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 15:22:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 15:22:36 2015 -0700
----------------------------------------------------------------------
modules/core/pom.xml | 1 -
.../util/nio/GridShmemCommunicationClient.java | 151 +++++++
.../communication/tcp/TcpCommunicationSpi.java | 414 ++++++++++++++++++-
.../tcp/TcpCommunicationSpiMBean.java | 8 +
.../IgniteCacheMessageRecoveryAbstractTest.java | 1 +
.../communication/GridIoManagerBenchmark0.java | 1 +
.../spi/GridTcpSpiForwardingSelfTest.java | 1 +
.../GridTcpCommunicationSpiAbstractTest.java | 13 +
...mmunicationSpiConcurrentConnectSelfTest.java | 4 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 21 +-
...pCommunicationSpiMultithreadedShmemTest.java | 28 ++
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 1 +
.../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++
.../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 +
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 +
...oopSecondaryFileSystemConfigurationTest.java | 14 +
...IgniteHadoopFileSystemHandshakeSelfTest.java | 7 +
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 7 +
.../hadoop/HadoopAbstractSelfTest.java | 6 +
21 files changed, 718 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
[20/29] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-5' into ignite-sprint-5
Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/922c1c44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/922c1c44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/922c1c44
Branch: refs/heads/ignite-389-ipc
Commit: 922c1c445b2519dbea1c7cb68145c05b2c063d41
Parents: e625709 7501025
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:14:20 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:14:20 2015 +0700
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgniteKernal.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
[08/29] incubator-ignite git commit: ignite-981 Do not access cache
in exchange future before cache is ready
Posted by ag...@apache.org.
ignite-981 Do not access cache in exchange future before cache is ready
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1603fe50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1603fe50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1603fe50
Branch: refs/heads/ignite-389-ipc
Commit: 1603fe502b03d5f3e57e7837e14f0d33af002236
Parents: 97d0bc1
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 3 13:21:32 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 13:34:01 2015 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionTopologyImpl.java | 8 ++-
.../GridDhtPartitionsExchangeFuture.java | 10 +++-
.../cache/IgniteDynamicCacheStartSelfTest.java | 62 ++++++++++++++++++++
3 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1ae4ae7..68652c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -740,7 +740,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']';
+ ", cache=" + cctx.name() +
+ ", started=" + cctx.started() +
+ ", stopping=" + stopping +
+ ", locNodeId=" + cctx.localNode().id() +
+ ", locName=" + cctx.gridName() + ']';
GridDhtPartitionFullMap m = node2part;
@@ -758,6 +762,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+ assert partMap != null;
+
lock.writeLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db43c6c..e0bfee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -902,8 +902,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
id.topologyVersion());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
- m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+ if (!cacheCtx.isLocal()) {
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+ boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+
+ if (ready)
+ m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+ }
}
// It is important that client topologies be added after contexts.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 095221e..db9e6a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -68,6 +69,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
private boolean testAttribute = true;
/** */
+ private boolean client;
+
+ /** */
private boolean daemon;
/**
@@ -85,6 +89,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ if (client) {
+ cfg.setClientMode(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ }
+
cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute));
CacheConfiguration cacheCfg = new CacheConfiguration();
@@ -1024,4 +1034,56 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
stopGrid(nodeCount());
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartStopWithClientJoin() throws Exception {
+ Ignite ignite1 = ignite(1);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ client = true;
+
+ int iter = 0;
+
+ while (!stop.get()) {
+ if (iter % 10 == 0)
+ log.info("Client start/stop iteration: " + iter);
+
+ iter++;
+
+ try (Ignite ignite = startGrid(nodeCount())) {
+ assertTrue(ignite.configuration().isClientMode());
+ }
+ }
+
+ return null;
+ }
+ }, 1, "client-start-stop");
+
+ try {
+ long stopTime = U.currentTimeMillis() + 30_000;
+
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ if (iter % 10 == 0)
+ log.info("Cache start/stop iteration: " + iter);
+
+ try (IgniteCache<Object, Object> cache = ignite1.getOrCreateCache("cache-" + iter)) {
+ assertNotNull(cache);
+ }
+
+ iter++;
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ fut.get();
+ }
}
[05/29] incubator-ignite git commit: IGNITE-983: Added support for
primitive types.
Posted by ag...@apache.org.
IGNITE-983: Added support for primitive types.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/11c0b904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/11c0b904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/11c0b904
Branch: refs/heads/ignite-389-ipc
Commit: 11c0b904a934c31ac936a8793cd11bf34af7634b
Parents: 97d0bc1
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 14:26:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 14:26:49 2015 +0700
----------------------------------------------------------------------
.../org/apache/ignite/configuration/CacheConfiguration.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11c0b904/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 2c7d8c1..8b1e1a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,7 +1664,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- this.indexedTypes = indexedTypes;
+ int len = indexedTypes.length;
+
+ Class<?>[] newIndexedTypes = new Class<?>[len];
+
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+ this.indexedTypes = newIndexedTypes;
return this;
}
[15/29] incubator-ignite git commit: # ignite-991 minor
Posted by ag...@apache.org.
# ignite-991 minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5ee09f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5ee09f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5ee09f0
Branch: refs/heads/ignite-389-ipc
Commit: b5ee09f0db550dc6a29c03e07473465d4ac767ab
Parents: 38c084a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:34:19 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:34:19 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/GridDhtPartitionTopologyImpl.java | 4 +++-
.../dht/preloader/GridDhtPartitionsExchangeFuture.java | 1 +
.../cache/IgniteDynamicCacheWithConfigStartSelfTest.java | 5 ++---
3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index af121c3..2656990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -668,7 +668,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
- ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
+ ", allIds=" + allIds +
+ ", node2part=" + node2part +
+ ", cache=" + cctx.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a03e2e8..fdaded1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -293,6 +293,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
+ * @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
*/
public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
index dcd6a69..6386f8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -48,7 +48,7 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
cfg.setDiscoverySpi(discoSpi);
if (client)
- cfg.setCacheConfiguration(cacheConfiguration(gridName));
+ cfg.setCacheConfiguration(cacheConfiguration());
cfg.setClientMode(client);
@@ -56,10 +56,9 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
}
/**
- * @param cacheName Cache name.
* @return Cache configuration.
*/
- protected CacheConfiguration cacheConfiguration(String cacheName) {
+ private CacheConfiguration cacheConfiguration() {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
ccfg.setIndexedTypes(String.class, String.class);
[09/29] incubator-ignite git commit: # IGNITE-983 Fixed logic to
support null value.
Posted by ag...@apache.org.
# IGNITE-983 Fixed logic to support null value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51d4737a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51d4737a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51d4737a
Branch: refs/heads/ignite-389-ipc
Commit: 51d4737ab70a2962ec8ef3f300317e1223d52ab3
Parents: bafad99
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 00:17:27 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 00:17:27 2015 +0700
----------------------------------------------------------------------
.../ignite/configuration/CacheConfiguration.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51d4737a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 8b1e1a5..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,14 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- int len = indexedTypes.length;
+ if (indexedTypes != null) {
+ int len = indexedTypes.length;
- Class<?>[] newIndexedTypes = new Class<?>[len];
+ Class<?>[] newIndexedTypes = new Class<?>[len];
- for (int i = 0; i < len; i++)
- newIndexedTypes[i] = U.box(indexedTypes[i]);
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
- this.indexedTypes = newIndexedTypes;
+ this.indexedTypes = newIndexedTypes;
+ }
+ else
+ this.indexedTypes = null;
return this;
}
[02/29] incubator-ignite git commit: # ignite-970
Posted by ag...@apache.org.
# ignite-970
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/39ce1cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/39ce1cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/39ce1cbf
Branch: refs/heads/ignite-389-ipc
Commit: 39ce1cbfe190a709c6a2711e42160727fb01ce02
Parents: 104a13f
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 14:37:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 14:37:42 2015 +0300
----------------------------------------------------------------------
.../loadtests/communication/GridIoManagerBenchmark0.java | 1 +
.../ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java | 7 +++++++
.../ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java | 7 +++++++
3 files changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 422d608..ea5b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -455,6 +455,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
spi.setTcpNoDelay(true);
spi.setConnectionBufferSize(0);
+ spi.setSharedMemoryPort(-1);
info("Comm SPI: " + spi);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
index 7cea968..a89e586 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.v2.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -197,6 +198,12 @@ public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractT
cfg.setDiscoverySpi(discoSpi);
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
metaCacheCfg.setName("replicated");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39ce1cbf/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index 2c17ba9..6773366 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -86,6 +87,12 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
cnt++;
return cfg;
[12/29] incubator-ignite git commit: #sprint-5 - Removed dumpStack
Posted by ag...@apache.org.
#sprint-5 - Removed dumpStack
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bd3abbc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bd3abbc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bd3abbc2
Branch: refs/heads/ignite-389-ipc
Commit: bd3abbc277d1dcfea7bad21ec1748ed5672ef22e
Parents: 97d0bc1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 18:54:31 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 18:54:31 2015 -0700
----------------------------------------------------------------------
.../apache/ignite/internal/processors/cache/GridCacheContext.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd3abbc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index b20e59d..46db7c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1469,9 +1469,6 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> dhtNodeIds = new ArrayList<>(dhtRemoteNodes);
Collection<ClusterNode> nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes);
- if (!F.isEmpty(nearNodeIds))
- U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds);
-
entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
}
[25/29] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-991' into ignite-sprint-5
Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-991' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a03d111f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a03d111f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a03d111f
Branch: refs/heads/ignite-389-ipc
Commit: a03d111f26fd16faf0629bc11fe2ef201dee92f3
Parents: bf3203a b5ee09f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 14:14:29 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 14:14:29 2015 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 19 +++-
...niteDynamicCacheWithConfigStartSelfTest.java | 97 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 1 +
5 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
[24/29] incubator-ignite git commit: # ignite-981
Posted by ag...@apache.org.
# ignite-981
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6ea325c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6ea325c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6ea325c
Branch: refs/heads/ignite-389-ipc
Commit: a6ea325cf751cd9197eb80230cedc76a6a30daa4
Parents: ddcb2a3
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 13:41:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 13:41:59 2015 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsClientCacheSelfTest.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6ea325c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
index 02166c4..9cda1b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
@@ -85,13 +85,16 @@ public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest {
cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME),
cacheConfiguration(CACHE_NAME));
- if (!gridName.equals(getTestGridName(0)))
- cfg.setClientMode(true);
-
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
+ if (!gridName.equals(getTestGridName(0))) {
+ cfg.setClientMode(true);
+
+ disco.setForceServerMode(true);
+ }
+
cfg.setDiscoverySpi(disco);
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
[27/29] incubator-ignite git commit: [IGNITE-218]: Wrong staging
permissions while running MR job under hadoop accelerator
Posted by ag...@apache.org.
[IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f72917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f72917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f72917
Branch: refs/heads/ignite-389-ipc
Commit: c9f72917843092d596044197cf7cb05c56a13fca
Parents: 20e5677
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 4 18:20:24 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 4 18:20:24 2015 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 14 +-
.../igfs/IgfsSecondaryFileSystemImpl.java | 2 +-
.../fs/IgniteHadoopFileSystemCounterWriter.java | 14 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 70 ++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +-
.../processors/hadoop/v2/HadoopV2Job.java | 31 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 26 +-
.../hadoop/v2/HadoopV2TaskContext.java | 48 +++-
.../hadoop/HadoopClientProtocolSelfTest.java | 6 +-
.../hadoop/HadoopAbstractSelfTest.java | 14 +-
.../hadoop/HadoopCommandLineTest.java | 14 +-
.../processors/hadoop/HadoopMapReduceTest.java | 176 +++++++++++-
.../hadoop/HadoopTaskExecutionSelfTest.java | 2 +-
.../hadoop/HadoopTasksAllVersionsTest.java | 15 +-
.../processors/hadoop/HadoopTasksV1Test.java | 5 +-
.../processors/hadoop/HadoopTasksV2Test.java | 5 +-
.../processors/hadoop/HadoopV2JobSelfTest.java | 6 +-
.../collections/HadoopAbstractMapTest.java | 12 +
22 files changed, 643 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Task context.
*/
public abstract class HadoopTaskContext {
/** */
- private final HadoopJob job;
+ protected final HadoopJob job;
/** */
private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException If failed.
*/
public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+ /**
+ * Executes a callable on behalf of the job owner.
+ * In case of embedded task execution the implementation of this method
+ * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+ * @param c The callable.
+ * @param <T> The return type of the Callable.
+ * @return The result of the callable.
+ * @throws IgniteCheckedException On any error in callable.
+ */
+ public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index b8095b8..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -121,6 +121,6 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
/** {@inheritDoc} */
@Override public void close() throws IgniteException {
- igfs.stop(true);
+ // No-op.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..d910507 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import java.io.*;
@@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
/** */
- private static final String DEFAULT_USER_NAME = "anonymous";
-
- /** */
public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
/** */
@@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
@Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
throws IgniteCheckedException {
- Configuration hadoopCfg = new Configuration();
+ Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
hadoopCfg.set(e.getKey(), e.getValue());
String user = jobInfo.user();
- if (F.isEmpty(user))
- user = DEFAULT_USER_NAME;
+ user = IgfsUtils.fixUserName(user);
String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
@@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
try {
- FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+ hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
fs.mkdirs(jobStatPath);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
@@ -144,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Custom-provided sequential reads before prefetch. */
private int seqReadsBeforePrefetch;
- /** The cache was disabled when the instance was creating. */
- private boolean cacheEnabled;
-
/** {@inheritDoc} */
@Override public URI getUri() {
if (uri == null)
@@ -173,27 +169,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
- * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+ * Gets non-null user name as per the Hadoop file system viewpoint.
* @return the user name, never null.
*/
- public static String getFsHadoopUser(Configuration cfg) throws IOException {
- String user = null;
-
- // -------------------------------------------
- // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
- // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
- // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
- // ugi.doAs() closure.
- if (cfg != null)
- user = cfg.get(MRJobConfig.USER_NAME);
- // -------------------------------------------
-
- if (user == null) {
- UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
-
- if (currUgi != null)
- user = currUgi.getShortUserName();
- }
+ public static String getFsHadoopUser() throws IOException {
+ UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+ String user = currUgi.getShortUserName();
user = IgfsUtils.fixUserName(user);
@@ -228,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
setConf(cfg);
- String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
- cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -242,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
uriAuthority = uri.getAuthority();
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
// Override sequential reads before prefetch if needed.
seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -360,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override protected void finalize() throws Throwable {
super.finalize();
- close0();
+ close();
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
- if (cacheEnabled && get(getUri(), getConf()) == this)
- return;
-
- close0();
+ if (closeGuard.compareAndSet(false, true))
+ close0();
}
/**
@@ -377,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @throws IOException If failed.
*/
private void close0() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- if (LOG.isDebugEnabled())
- LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
- if (rmtClient == null)
- return;
+ if (rmtClient == null)
+ return;
- super.close();
+ super.close();
- rmtClient.close(false);
+ rmtClient.close(false);
- if (clientLog.isLogEnabled())
- clientLog.close();
+ if (clientLog.isLogEnabled())
+ clientLog.close();
- if (secondaryFs != null)
- U.closeQuiet(secondaryFs);
+ if (secondaryFs != null)
+ U.closeQuiet(secondaryFs);
- // Reset initialized resources.
- uri = null;
- rmtClient = null;
- }
+ // Reset initialized resources.
+ uri = null;
+ rmtClient = null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index f3fbe9c..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
uri = name;
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
try {
initialize(name, cfg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
synchronized (HadoopDefaultJobInfo.class) {
if ((jobCls0 = jobCls) == null) {
- HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+ HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..68a9ef6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.io.*;
+import java.net.*;
import java.util.*;
/**
@@ -57,6 +63,41 @@ public class HadoopUtils {
/** Old reducer class attribute. */
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+ @Override public FileSystem createValue(FsCacheKey key) {
+ try {
+ assert key != null;
+
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
+
+ String scheme = uri.getScheme();
+
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
+
+ String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+ cfg.setBoolean(prop, true);
+
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (IOException | InterruptedException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
+ /**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
/**
* Wraps native split.
*
@@ -126,8 +167,6 @@ public class HadoopUtils {
break;
case PHASE_REDUCE:
- // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
- // See https://issues.apache.org/jira/browse/IGNITE-764
setupProgress = 1;
mapProgress = 1;
@@ -304,9 +343,242 @@ public class HadoopUtils {
}
/**
- * Constructor.
+ * Creates {@link Configuration} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link Configuration}.
*/
- private HadoopUtils() {
- // No-op.
+ public static Configuration safeCreateConfiguration() {
+ final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+ try {
+ return new Configuration();
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(cl0);
+ }
+ }
+
+ /**
+ * Creates {@link JobConf} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link JobConf}.
+ */
+ public static JobConf safeCreateJobConf() {
+ final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+ try {
+ return new JobConf();
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(cl0);
+ }
+ }
+
+ /**
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
+ */
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It creates the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * @param uri the file system uri.
+ * @param cfg the configuration.
+ * @return the file system
+ * @throws IOException
+ */
+ public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ if (doCacheFs) {
+ try {
+ fs = getWithCaching(uri, cfg, usr);
+ }
+ catch (IgniteException ie) {
+ throw new IOException(ie);
+ }
+ }
+ else {
+ try {
+ fs = FileSystem.get(uri, cfg, usr);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
+ }
+
+ /**
+ * Note that configuration is not a part of the key.
+ * It is used solely to initialize the first instance
+ * that is created for the key.
+ */
+ public static final class FsCacheKey {
+ /** */
+ private final URI uri;
+
+ /** */
+ private final String usr;
+
+ /** */
+ private final String equalityKey;
+
+ /** */
+ private final Configuration cfg;
+
+ /**
+ * Constructor
+ */
+ public FsCacheKey(URI uri, String usr, Configuration cfg) {
+ assert uri != null;
+ assert usr != null;
+ assert cfg != null;
+
+ this.uri = fixUri(uri, cfg);
+ this.usr = usr;
+ this.cfg = cfg;
+
+ this.equalityKey = createEqualityKey();
+ }
+
+ /**
+ * Creates String key used for equality and hashing.
+ */
+ private String createEqualityKey() {
+ GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+ if (uri.getScheme() != null)
+ sb.a(uri.getScheme().toLowerCase());
+
+ sb.a("://");
+
+ if (uri.getAuthority() != null)
+ sb.a(uri.getAuthority().toLowerCase());
+
+ return sb.toString();
+ }
+
+ /**
+ * The URI.
+ */
+ public URI uri() {
+ return uri;
+ }
+
+ /**
+ * The User.
+ */
+ public String user() {
+ return usr;
+ }
+
+ /**
+ * The Configuration.
+ */
+ public Configuration configuration() {
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return equalityKey.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return equalityKey;
+ }
+ }
+
+ /**
+ * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+ * for each class loader.
+ *
+ * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+ * The Configuration is not a part of the key. This means that for the given key file system is
+ * initialized only once with the Configuration passed in upon the file system creation.
+ *
+ * @param uri The file system URI.
+ * @param cfg The configuration.
+ * @param usr The user to create file system for.
+ * @return The file system: either created, or taken from the cache.
+ */
+ private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+ FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+ return fileSysLazyMap.getOrCreate(key);
+ }
+
+ /**
+ * Gets the property name to disable file system cache.
+ * @param scheme The file system URI scheme.
+ * @return The property name. If scheme is null,
+ * returns "fs.null.impl.disable.cache".
+ */
+ public static String disableFsCachePropertyName(@Nullable String scheme) {
+ return String.format("fs.%s.impl.disable.cache", scheme);
+ }
+
+ /**
+ * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+ * @param uri0 The uri.
+ * @param cfg The cfg.
+ * @return Correct URI.
+ */
+ public static URI fixUri(URI uri0, Configuration cfg) {
+ if (uri0 == null)
+ return FileSystem.getDefaultUri(cfg);
+
+ String scheme = uri0.getScheme();
+ String authority = uri0.getAuthority();
+
+ if (authority == null) {
+ URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+ if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+ return dfltUri;
+ }
+
+ return uri0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
*/
public class SecondaryFileSystemProvider {
/** Configuration of the secondary filesystem, never null. */
- private final Configuration cfg = new Configuration();
+ private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
/** The secondary filesystem URI, never null. */
private final URI uri;
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
}
// Disable caching:
- String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+ String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
cfg.setBoolean(prop, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
/** {@inheritDoc} */
@Override public Void call() throws IgniteCheckedException {
+ ctx = job.getTaskContext(info);
+
+ return ctx.runAsJobOwner(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ call0();
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Implements actual task running.
+ * @throws IgniteCheckedException
+ */
+ void call0() throws IgniteCheckedException {
execStartTs = U.currentTimeMillis();
Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
HadoopPerformanceCounter perfCntr = null;
try {
- ctx = job.getTaskContext(info);
-
perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
if (ctx != null)
ctx.cleanupTaskEnvironment();
}
-
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
new ConcurrentHashMap8<>();
/** Pooling task context class and thus class loading environment. */
- private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+ private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
/** All created contexts. */
private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
- HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
- // Before create JobConf instance we should set new context class loader.
- Thread.currentThread().setContextClassLoader(clsLdr);
-
- jobConf = new JobConf();
+ jobConf = HadoopUtils.safeCreateJobConf();
HadoopFileSystemsUtils.setupFileSystems(jobConf);
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
Path jobDir = new Path(jobDirPath);
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+ try {
+ FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
jobDir);
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
if (old != null)
return old.get();
- Class<?> cls = taskCtxClsPool.poll();
+ Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
try {
if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
// Note that the classloader identified by the task it was initially created for,
// but later it may be reused for other tasks.
HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
- "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+ "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
- cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+ cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
fullCtxClsQueue.add(cls);
}
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
/** {@inheritDoc} */
@Override public void cleanupStagingDirectory() {
- if (rsrcMgr != null)
- rsrcMgr.cleanupStagingDirectory();
+ rsrcMgr.cleanupStagingDirectory();
+ }
+
+ /**
+ * Getter for job configuration.
+ * @return The job configuration.
+ */
+ public JobConf jobConf() {
+ return jobConf;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
* files are needed to be placed on local files system.
*/
public class HadoopV2JobResourceManager {
+ /** File type Fs disable caching property name. */
+ private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file");
+
/** Hadoop job context. */
private final JobContextImpl ctx;
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
try {
cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
- if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+ if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
}
finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
- FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
if (!fs.exists(stagingDir))
- throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
- stagingDir);
+ throw new IgniteCheckedException("Failed to find map-reduce submission " +
+ "directory (does not exist): " + stagingDir);
if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
- throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
- "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+ throw new IgniteCheckedException("Failed to copy job submission directory "
+ + "contents to local file system "
+ + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ + ", jobId=" + jobId + ']');
}
File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
}
}
else if (!jobLocDir.mkdirs())
- throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+ throw new IgniteCheckedException("Failed to create local job directory: "
+ + jobLocDir.getAbsolutePath());
setLocalFSWorkingDirectory(jobLocDir);
}
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
FileSystem dstFs = FileSystem.getLocal(cfg);
- FileSystem srcFs = srcPath.getFileSystem(cfg);
+ FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
if (extract) {
File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
if (!archivesPath.exists() && !archivesPath.mkdir())
throw new IOException("Failed to create directory " +
- "[path=" + archivesPath + ", jobId=" + jobId + ']');
+ "[path=" + archivesPath + ", jobId=" + jobId + ']');
File archiveFile = new File(archivesPath, locName);
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
public void cleanupStagingDirectory() {
try {
if (stagingDir != null)
- stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+ HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
}
catch (Exception e) {
log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index dd18c66..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
+import java.security.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -419,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+ try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
in.seek(split.offset());
@@ -448,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
throw new IgniteCheckedException(e);
}
}
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+ String user = job.info().user();
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ String ugiUser;
+
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+ assert currUser != null;
+
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
+
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
import org.apache.ignite.hadoop.mapreduce.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* @return Configuration.
*/
private Configuration config(int port) {
- Configuration conf = new Configuration();
+ Configuration conf = HadoopUtils.safeCreateConfiguration();
setupFileSystems(conf);
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
ctx.getCounter(TestCounter.COUNTER2).increment(1);
int sum = 0;
- for (IntWritable value : values) {
+ for (IntWritable value : values)
sum += value.get();
- }
ctx.write(key, new IntWritable(sum));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..e8a0a6f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/** Initial REST port. */
private int restPort = REST_PORT;
+ /** Secondary file system REST endpoint configuration. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+ static {
+ SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_REST_CFG.setPort(11500);
+ }
+
+
/** Initial classpath. */
private static String initCp;
@@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/**
* @return IGFS configuration.
*/
- public FileSystemConfiguration igfsConfiguration() {
+ public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = new FileSystemConfiguration();
cfg.setName(igfsName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index d10ee5c..c66cdf3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop;
import com.google.common.base.*;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.resource.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.junits.common.*;
import org.jsr166.*;
@@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+ String cfgPath = "config/hadoop/default-config.xml";
+
+ IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+ IgniteConfiguration cfg = tup.get1();
+
+ cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+ igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..a1ef7ba 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Test of whole cycle of map-reduce processing via Job tracker.
*/
public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+ /** IGFS block size. */
+ protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+ /** Amount of blocks to prefetch. */
+ protected static final int PREFETCH_BLOCKS = 1;
+
+ /** Amount of sequential block reads before prefetch is triggered. */
+ protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+ /** Secondary file system URI. */
+ protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+ /** Secondary file system configuration path. */
+ protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+ /** The user to run Hadoop job on behalf of. */
+ protected static final String USER = "vasya";
+
+ /** Secondary IGFS name. */
+ protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+ /** The secondary Ignite node. */
+ protected Ignite igniteSecondary;
+
+ /** The secondary Fs. */
+ protected IgfsSecondaryFileSystem secondaryFs;
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/**
+ * Gets owner of a IgfsEx path.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwner(IgfsEx i, IgfsPath p) {
+ return i.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+
+ /**
+ * Gets owner of a secondary Fs path.
+ * @param secFs The sec Fs.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+ return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+ @Override public String apply() {
+ return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+ });
+ }
+
+ /**
+ * Checks owner of the path.
+ * @param p The path.
+ */
+ private void checkOwner(IgfsPath p) {
+ String ownerPrim = getOwner(igfs, p);
+ assertEquals(USER, ownerPrim);
+
+ String ownerSec = getOwnerSecondary(secondaryFs, p);
+ assertEquals(USER, ownerSec);
+ }
+
+ /**
* Tests whole job execution with all phases in all combination of new and old versions of API.
* @throws Exception If fails.
*/
@@ -71,7 +144,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
JobConf jobConf = new JobConf();
jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
- jobConf.setUser("yyy");
+ jobConf.setUser(USER);
jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
//To split into about 40 items for v2
@@ -105,14 +178,20 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
checkJobStatistics(jobId);
+ final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+ checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+ checkOwner(new IgfsPath(outFile));
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
- useNewReducer,
+ useNewReducer,
"blue\t200000\n" +
- "green\t150000\n" +
- "red\t100000\n" +
- "yellow\t70000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
- );
+ "green\t150000\n" +
+ "red\t100000\n" +
+ "yellow\t70000\n",
+ readAndSortFile(outFile)
+ );
}
}
@@ -182,7 +261,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
}
}
- final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+ final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -212,4 +291,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
}
}
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+ super.beforeTest();
+ }
+
+ /**
+ * Start grid with IGFS.
+ *
+ * @param gridName Grid name.
+ * @param igfsName IGFS name
+ * @param mode IGFS mode.
+ * @param secondaryFs Secondary file system (optional).
+ * @param restCfg Rest configuration string (optional).
+ * @return Started grid instance.
+ * @throws Exception If failed.
+ */
+ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(igfsName);
+ igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+ igfsCfg.setDefaultMode(mode);
+ igfsCfg.setIpcEndpointConfiguration(restCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setNearConfiguration(null);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return G.start(cfg);
+ }
+
+ /**
+ * @return IGFS configuration.
+ */
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+ FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+ secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+ fsCfg.setSecondaryFileSystem(secondaryFs);
+
+ return fsCfg;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
/** {@inheritDoc} */
- @Override public FileSystemConfiguration igfsConfiguration() {
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = super.igfsConfiguration();
cfg.setFragmentizerEnabled(false);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+ public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
/**
* @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
igfs.info(inFile).length() - fileBlock1.length());
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+ private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
int taskNum, String... words) throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testReduceTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testCombinerTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob("/", "/");
+ HadoopJob gridJob = getHadoopJob("/", "/");
HadoopTestTaskContext ctx =
runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context of combine task with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+ private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
- HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.hadoop.mapred.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.util.*;
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws Exception if fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
Job job = Job.getInstance();
job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
cfg.setMapOutputValueClass(Text.class);
cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
- HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+ HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+ HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+ HadoopJob job = info.createJob(id, log);
HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
null));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..9395c5e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
import org.jetbrains.annotations.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Abstract class for maps test.
@@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
assert false;
}
+ /** {@inheritDoc} */
@Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
assert false;
}
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
/**
[04/29] incubator-ignite git commit: # ignite-970
Posted by ag...@apache.org.
# ignite-970
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7158fb6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7158fb6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7158fb6a
Branch: refs/heads/ignite-389-ipc
Commit: 7158fb6a4ff7b9db3afda73f75376ad3285c556c
Parents: d6f9b64
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 15:57:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 15:57:16 2015 +0300
----------------------------------------------------------------------
.../GridTcpCommunicationSpiMultithreadedSelfTest.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7158fb6a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 5d25299..dc7f344 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -62,8 +62,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
/** SPIs */
- private static final Map<UUID, CommunicationSpi<Message>> spis =
- new ConcurrentHashMap<>();
+ private static final Map<UUID, CommunicationSpi<Message>> spis = new ConcurrentHashMap<>();
/** Listeners. */
private static final Map<UUID, MessageListener> lsnrs = new HashMap<>();
@@ -85,13 +84,20 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/**
* @param useShmem Use shared mem.
*/
- public GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+ protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
super(false);
this.useShmem = useShmem;
}
/**
+ *
+ */
+ public GridTcpCommunicationSpiMultithreadedSelfTest() {
+ this(false);
+ }
+
+ /**
* Accumulating listener.
*/
@SuppressWarnings({"deprecation"})
[18/29] incubator-ignite git commit: # ignite-sprint-5
Posted by ag...@apache.org.
# ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7501025d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7501025d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7501025d
Branch: refs/heads/ignite-389-ipc
Commit: 7501025d815a61ef881d86fa326bc6e17064ec0e
Parents: ae5189a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 11:01:15 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:01:15 2015 +0300
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgniteKernal.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7501025d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..a0d97c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -963,8 +963,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
sysPoolQSize = exec.getQueue().size();
}
+ String id = U.id8(localNode().id());
+
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+ " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
[19/29] incubator-ignite git commit: Merge branch 'ignite-988' into
ignite-sprint-5
Posted by ag...@apache.org.
Merge branch 'ignite-988' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e625709b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e625709b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e625709b
Branch: refs/heads/ignite-389-ipc
Commit: e625709b2f74853ef883df6cafa46b8a2b0245f7
Parents: ae5189a 46eab5b
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:06:25 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:06:25 2015 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
.../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[29/29] incubator-ignite git commit: IGNITE-389 - IPC checked and API
improvements.
Posted by ag...@apache.org.
IGNITE-389 - IPC checked and API improvements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b51f99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b51f99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b51f99e
Branch: refs/heads/ignite-389-ipc
Commit: 6b51f99e72eb11af25403f8ec50087c03b1f1fb7
Parents: 1d8643c
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 4 19:19:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 4 19:19:36 2015 -0700
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 4 +-
.../shmem/IpcSharedMemoryClientEndpoint.java | 2 +-
.../ipc/shmem/IpcSharedMemoryNativeLoader.java | 150 +++++++++++++++++--
.../shmem/IpcSharedMemoryServerEndpoint.java | 2 +-
.../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +-
.../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +-
.../LoadWithCorruptedLibFileTestRunner.java | 2 +-
.../IpcSharedMemoryBenchmarkReader.java | 2 +-
.../IpcSharedMemoryBenchmarkWriter.java | 2 +-
.../hadoop/HadoopAbstractSelfTest.java | 1 +
.../org/apache/ignite/spark/IgniteContext.scala | 19 ++-
.../org/apache/ignite/spark/IgniteRDD.scala | 8 +-
13 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0932212..9016b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9025,11 +9025,11 @@ public abstract class IgniteUtils {
hasShmem = false;
else {
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
hasShmem = true;
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException ignore) {
hasShmem = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index 27a234f..c935c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
boolean clear = true;
try {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
index dc00ca6..8c345f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.ipc.shmem;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
@@ -25,6 +26,8 @@ import java.net.*;
import java.nio.channels.*;
import java.security.*;
import java.util.*;
+import java.util.jar.*;
+import java.util.zip.*;
import static org.apache.ignite.internal.IgniteVersionUtils.*;
@@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader {
/** Library name base. */
private static final String LIB_NAME_BASE = "igniteshmem";
+ /** Library jar name base. */
+ private static final String JAR_NAME_BASE = "shmem";
+
/** Library name. */
static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR;
@@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * @param log Logger, if available. If null, warnings will be printed out to console.
* @throws IgniteCheckedException If failed.
*/
- public static void load() throws IgniteCheckedException {
+ public static void load(IgniteLogger log) throws IgniteCheckedException {
if (loaded)
return;
@@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader {
if (loaded)
return;
- doLoad();
+ doLoad(log);
loaded = true;
}
@@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader {
/**
* @throws IgniteCheckedException If failed.
*/
- private static void doLoad() throws IgniteCheckedException {
+ private static void doLoad(IgniteLogger log) throws IgniteCheckedException {
assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class);
Collection<Throwable> errs = new ArrayList<>();
@@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader {
// Obtain lock on file to prevent concurrent extracts.
try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws");
- FileLock ignored = randomAccessFile.getChannel().lock()) {
+ FileLock ignored = randomAccessFile.getChannel().lock()) {
if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath()))
return;
@@ -134,6 +141,30 @@ public class IpcSharedMemoryNativeLoader {
if (extractAndLoad(errs, tmpDir, resourcePath()))
return;
+ try {
+ U.quietAndWarn(log, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME.");
+
+ String igniteHome = X.resolveIgniteHome();
+
+ File shmemJar = findShmemJar(errs, igniteHome);
+
+ if (shmemJar != null) {
+ try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) {
+ if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath()))
+ return;
+
+ if (extractAndLoad(errs, jar, tmpDir, resourcePath()))
+ return;
+ }
+ }
+ }
+ catch (IgniteCheckedException ignore) {
+
+ }
+
// Failed to find the library.
assert !errs.isEmpty();
@@ -145,6 +176,32 @@ public class IpcSharedMemoryNativeLoader {
}
/**
+ * Tries to find shmem jar in IGNITE_HOME/libs folder.
+ *
+ * @param errs Collection of errors to add readable exception to.
+ * @param igniteHome Resolver IGNITE_HOME variable.
+ * @return File, if found.
+ */
+ private static File findShmemJar(Collection<Throwable> errs, String igniteHome) {
+ File libs = new File(igniteHome, "libs");
+
+ if (!libs.exists() || libs.isFile()) {
+ errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ for (File lib : libs.listFiles()) {
+ if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE))
+ return lib;
+ }
+
+ errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome));
+
+ return null;
+ }
+
+ /**
* Gets temporary directory unique for each OS user.
* The directory guaranteed to exist, though may not be empty.
*/
@@ -220,6 +277,24 @@ public class IpcSharedMemoryNativeLoader {
/**
* @param errs Errors collection.
+ * @param rsrcPath Path.
+ * @return {@code True} if library was found and loaded.
+ */
+ private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar, File tmpDir, String rsrcPath) {
+ ZipEntry rsrc = jar.getEntry(rsrcPath);
+
+ if (rsrc != null)
+ return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME)));
+ else {
+ errs.add(new IllegalStateException("Failed to find resource within specified jar file " +
+ "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']'));
+
+ return false;
+ }
+ }
+
+ /**
+ * @param errs Errors collection.
* @param src Source.
* @param target Target.
* @return {@code True} if resource was found and loaded.
@@ -230,7 +305,7 @@ public class IpcSharedMemoryNativeLoader {
InputStream is = null;
try {
- if (!target.exists() || !haveEqualMD5(target, src)) {
+ if (!target.exists() || !haveEqualMD5(target, src.openStream())) {
is = src.openStream();
if (is != null) {
@@ -265,20 +340,69 @@ public class IpcSharedMemoryNativeLoader {
}
/**
- * @param target Target.
+ * @param errs Errors collection.
* @param src Source.
+ * @param target Target.
+ * @return {@code True} if resource was found and loaded.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile jar, File target) {
+ FileOutputStream os = null;
+ InputStream is = null;
+
+ try {
+ if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) {
+ is = jar.getInputStream(src);
+
+ if (is != null) {
+ os = new FileOutputStream(target);
+
+ int read;
+
+ byte[] buf = new byte[4096];
+
+ while ((read = is.read(buf)) != -1)
+ os.write(buf, 0, read);
+ }
+ }
+
+ // chmod 775.
+ if (!U.isWindows())
+ Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor();
+
+ System.load(target.getPath());
+
+ return true;
+ }
+ catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) {
+ errs.add(e);
+ }
+ finally {
+ U.closeQuiet(os);
+ U.closeQuiet(is);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param target Target.
+ * @param srcIS Source input stream.
* @return {@code True} if target md5-sum equal to source md5-sum.
* @throws NoSuchAlgorithmException If md5 algorithm was not found.
* @throws IOException If an I/O exception occurs.
*/
- private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException {
- try (InputStream targetIS = new FileInputStream(target);
- InputStream srcIS = src.openStream()) {
-
- String targetMD5 = U.calculateMD5(targetIS);
- String srcMD5 = U.calculateMD5(srcIS);
+ private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException {
+ try {
+ try (InputStream targetIS = new FileInputStream(target)) {
+ String targetMD5 = U.calculateMD5(targetIS);
+ String srcMD5 = U.calculateMD5(srcIS);
- return targetMD5.equals(srcMD5);
+ return targetMD5.equals(srcMD5);
+ }
+ }
+ finally {
+ srcIS.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 5185856..102c5b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log);
pid = IpcSharedMemoryUtils.pid();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 2ddf6f3..c6f590e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
index 7dc0870..4afb64b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
@@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
index 4c5413c..176429e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
@@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(log());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
index 8ff827b..8fee239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
@@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner {
createCorruptedLibFile();
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
index 28495af..89eeda1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
@@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
index 2ade145..e8a8402 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP
* @throws IgniteCheckedException If failed.
*/
public static void main(String[] args) throws IgniteCheckedException {
- IpcSharedMemoryNativeLoader.load();
+ IpcSharedMemoryNativeLoader.load(null);
int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index 517a587..a3c9bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 5cdbad0..2cfebd6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -34,8 +34,23 @@ import org.apache.spark.sql.SQLContext
*/
class IgniteContext[K, V](
@scala.transient val sparkContext: SparkContext,
- cfgF: () ⇒ IgniteConfiguration
+ cfgF: () ⇒ IgniteConfiguration,
+ client: Boolean = true
) extends Serializable {
+ @scala.transient private val driver = true
+
+ if (!client) {
+ val workers = sparkContext.getExecutorStorageStatus.length - 1
+
+ if (workers <= 0)
+ throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
+
+ println("Will start Ignite nodes on " + workers + " workers")
+
+ // Start ignite server node on each worker in server mode.
+ sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+ }
+
def this(
sc: SparkContext,
springUrl: String
@@ -62,7 +77,7 @@ class IgniteContext[K, V](
catch {
case e: Exception ⇒
try {
- igniteCfg.setClientMode(true)
+ igniteCfg.setClientMode(client || driver)
Ignition.start(igniteCfg)
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..0d1a3be 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -114,7 +114,7 @@ class IgniteRDD[K, V] (
ic.sqlContext.createDataFrame(rowRdd, schema)
}
- def saveValues(rdd: RDD[V]) = {
+ def saveValues(rdd: RDD[V], overwrite: Boolean = false) = {
rdd.foreachPartition(it ⇒ {
val ig = ic.ignite()
@@ -127,6 +127,8 @@ class IgniteRDD[K, V] (
val streamer = ig.dataStreamer[Object, V](cacheName)
try {
+ streamer.allowOverwrite(overwrite)
+
it.foreach(value ⇒ {
val key = affinityKeyFunc(value, node.orNull)
@@ -139,7 +141,7 @@ class IgniteRDD[K, V] (
})
}
- def savePairs(rdd: RDD[(K, V)]) = {
+ def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
rdd.foreachPartition(it ⇒ {
val ig = ic.ignite()
@@ -149,6 +151,8 @@ class IgniteRDD[K, V] (
val streamer = ig.dataStreamer[K, V](cacheName)
try {
+ streamer.allowOverwrite(overwrite)
+
it.foreach(tup ⇒ {
streamer.addData(tup._1, tup._2)
})
[28/29] incubator-ignite git commit: IGNITE-389 - Merge branch
ignite-sprint-5 into ignite-389-ipc
Posted by ag...@apache.org.
IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389-ipc
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d8643c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d8643c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d8643c0
Branch: refs/heads/ignite-389-ipc
Commit: 1d8643c0b93786f7eeff82bb56b64e6df53b3697
Parents: a329e90 c9f7291
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 4 11:09:30 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 4 11:09:30 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../managers/communication/GridIoManager.java | 117 ++++----
.../processors/cache/GridCacheContext.java | 3 -
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 16 +-
.../GridDhtPartitionsExchangeFuture.java | 29 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../processors/hadoop/HadoopTaskContext.java | 14 +-
.../igfs/IgfsSecondaryFileSystemImpl.java | 2 +-
.../internal/visor/query/VisorQueryJob.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 --
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 56 +++-
.../tcp/ipfinder/TcpDiscoveryIpFinder.java | 10 +-
.../TcpDiscoveryMulticastIpFinder.java | 47 +++-
.../cache/IgniteDynamicCacheStartSelfTest.java | 62 ++++
...niteDynamicCacheWithConfigStartSelfTest.java | 35 +--
.../igfs/IgfsClientCacheSelfTest.java | 9 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
.../testsuites/IgniteCacheTestSuite4.java | 1 +
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 43 +--
.../fs/IgniteHadoopFileSystemCounterWriter.java | 14 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 70 ++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +-
.../processors/hadoop/v2/HadoopV2Job.java | 31 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 26 +-
.../hadoop/v2/HadoopV2TaskContext.java | 48 +++-
.../hadoop/HadoopClientProtocolSelfTest.java | 6 +-
.../hadoop/HadoopAbstractSelfTest.java | 14 +-
.../hadoop/HadoopCommandLineTest.java | 14 +-
.../processors/hadoop/HadoopMapReduceTest.java | 176 +++++++++++-
.../hadoop/HadoopTaskExecutionSelfTest.java | 2 +-
.../hadoop/HadoopTasksAllVersionsTest.java | 15 +-
.../processors/hadoop/HadoopTasksV1Test.java | 5 +-
.../processors/hadoop/HadoopTasksV2Test.java | 5 +-
.../processors/hadoop/HadoopV2JobSelfTest.java | 6 +-
.../collections/HadoopAbstractMapTest.java | 12 +
...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++
.../IgniteCacheWithIndexingTestSuite.java | 2 +
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
48 files changed, 1358 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d8643c0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
[26/29] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-981' into ignite-sprint-5
Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20e56777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20e56777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20e56777
Branch: refs/heads/ignite-389-ipc
Commit: 20e567773b5b0674754a348b6d41d29370f3bd87
Parents: a03d111 a6ea325
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 15:06:44 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 15:06:44 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 117 +++++++------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../igfs/IgfsClientCacheSelfTest.java | 9 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
5 files changed, 238 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
[13/29] incubator-ignite git commit: # IGNITE-988. Reworked scan
command.
Posted by ag...@apache.org.
# IGNITE-988. Reworked scan command.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46eab5b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46eab5b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46eab5b6
Branch: refs/heads/ignite-389-ipc
Commit: 46eab5b67f1f10260b27a1eb0474904982aa3725
Parents: bd3abbc
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 10:02:02 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 10:02:02 2015 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
.../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index 4a9daad..8915240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -65,7 +65,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
try {
UUID nid = ignite.localNode().id();
- boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN");
+ boolean scan = arg.queryTxt() == null;
String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" +
UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 4b66720..3aa2a19 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -139,7 +139,7 @@ class VisorCacheScanCommand {
val firstPage =
try
executeRandom(groupForDataNode(node, cacheName),
- classOf[VisorQueryTask], new VisorQueryArg(cacheName, "SCAN", false, pageSize)) match {
+ classOf[VisorQueryTask], new VisorQueryArg(cacheName, null, false, pageSize)) match {
case x if x.get1() != null =>
error(x.get1())
[03/29] incubator-ignite git commit: # ignite-970
Posted by ag...@apache.org.
# ignite-970
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d6f9b647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d6f9b647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d6f9b647
Branch: refs/heads/ignite-389-ipc
Commit: d6f9b647ab92d822aebbef06315ccb0af41f8238
Parents: 39ce1cb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 15:39:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 15:39:12 2015 +0300
----------------------------------------------------------------------
modules/core/pom.xml | 1 -
.../tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java | 2 +-
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d6f9b647/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 370fe69..8c37a4f 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -129,7 +129,6 @@
<groupId>org.gridgain</groupId>
<artifactId>ignite-shmem</artifactId>
<version>1.0.0</version>
- <scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d6f9b647/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 9909d76..5d25299 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -85,7 +85,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/**
* @param useShmem Use shared mem.
*/
- protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+ public GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
super(false);
this.useShmem = useShmem;
[14/29] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-991
Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-991
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38c084a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38c084a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38c084a8
Branch: refs/heads/ignite-389-ipc
Commit: 38c084a81850d26e336382822574004b79fce935
Parents: 1de11ff bd3abbc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:27:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:27:03 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/processors/cache/GridCacheContext.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
[23/29] incubator-ignite git commit: # ignite-981 fixed wait for
cache initialization on clients
Posted by ag...@apache.org.
# ignite-981 fixed wait for cache initialization on clients
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f
Branch: refs/heads/ignite-389-ipc
Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f
Parents: 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:25:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:50:36 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 117 +++++++------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6e8d457..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- Object msgBody = ioMsg.body();
-
- assert msgBody != null || ioMsg.bodyBytes() != null;
+ busyLock.readLock();
try {
- byte[] msgTopicBytes = ioMsg.topicBytes();
-
- Object msgTopic = ioMsg.topic();
-
- GridDeployment dep = ioMsg.deployment();
-
- if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
- ioMsg.deploymentClassName() != null) {
- dep = ctx.deploy().getGlobalDeployment(
- ioMsg.deploymentMode(),
- ioMsg.deploymentClassName(),
- ioMsg.deploymentClassName(),
- ioMsg.userVersion(),
- nodeId,
- ioMsg.classLoaderId(),
- ioMsg.loaderParticipants(),
- null);
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException(
- "Failed to obtain deployment information for user message. " +
- "If you are using custom message or topic class, try implementing " +
- "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
- ioMsg.deployment(dep); // Cache deployment.
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received user message while stopping (will ignore) [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ return;
}
- // Unmarshall message topic if needed.
- if (msgTopic == null && msgTopicBytes != null) {
- msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+ Object msgBody = ioMsg.body();
- ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
- }
+ assert msgBody != null || ioMsg.bodyBytes() != null;
- if (!F.eq(topic, msgTopic))
- return;
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException(
+ "Failed to obtain deployment information for user message. " +
+ "If you are using custom message or topic class, try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
- if (msgBody == null) {
- msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
- ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
- }
+ ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+ }
- // Resource injection.
- if (dep != null)
- ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
- msg + ']', e);
- }
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+
+ ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+ }
- if (msgBody != null) {
- if (predLsnr != null) {
- if (!predLsnr.apply(nodeId, msgBody))
- removeMessageListener(TOPIC_COMM_USER, this);
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
+ }
+ }
+ finally {
+ busyLock.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1aef18c..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
new file mode 100644
index 0000000..855a4f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Message topic. */
+ private enum TOPIC {
+ /** */
+ ORDERED
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new OptimizedMarshaller(false));
+
+ if (gridName.equals(getTestGridName(2))) {
+ cfg.setClientMode(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageSendWithClientJoin() throws Exception {
+ startGrid(0);
+
+ Ignite ignite1 = startGrid(1);
+
+ ClusterGroup rmts = ignite1.cluster().forRemotes();
+
+ IgniteMessaging msg = ignite1.message(rmts);
+
+ msg.localListen(TOPIC.ORDERED, new LocalListener());
+
+ msg.remoteListen(TOPIC.ORDERED, new RemoteListener());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ while (!stop.get()) {
+ if (iter % 10 == 0)
+ log.info("Client start/stop iteration: " + iter);
+
+ iter++;
+
+ try (Ignite ignite = startGrid(2)) {
+ assertTrue(ignite.configuration().isClientMode());
+ }
+ }
+
+ return null;
+ }
+ }, 1, "client-start-stop");
+
+ try {
+ long stopTime = U.currentTimeMillis() + 30_000;
+
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0);
+ }
+ catch (IgniteException e) {
+ log.info("Message send failed: " + e);
+ }
+
+ iter++;
+
+ if (iter % 100 == 0)
+ Thread.sleep(5);
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ fut.get();
+ }
+
+ /**
+ *
+ */
+ private static class LocalListener implements IgniteBiPredicate<UUID, String> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, String s) {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteListener implements IgniteBiPredicate<UUID, String> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, String msg) {
+ ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9eb31f1..e0a1e6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridSelfTest.class));
suite.addTest(new TestSuite(GridProjectionSelfTest.class));
suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+ suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
if (U.isLinux() || U.isMacOs())
[16/29] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-989' into ignite-sprint-5
Posted by ag...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-989' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b12bb4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b12bb4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b12bb4a
Branch: refs/heads/ignite-389-ipc
Commit: 1b12bb4a80a04071c77d2603956505bd66fa19ea
Parents: bd3abbc 9ad3617
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:39:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:39:12 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 -----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 56 +++++++-
.../tcp/ipfinder/TcpDiscoveryIpFinder.java | 10 +-
.../TcpDiscoveryMulticastIpFinder.java | 47 +++++--
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 43 ++++---
8 files changed, 244 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
[11/29] incubator-ignite git commit: IGNITE-991 - Fix cache start
from client node config.
Posted by ag...@apache.org.
IGNITE-991 - Fix cache start from client node config.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de11fff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de11fff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de11fff
Branch: refs/heads/ignite-389-ipc
Commit: 1de11fff3cea7883a2f28a35107d7c9dfc75d5e0
Parents: 97d0bc1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 16:34:12 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 16:34:12 2015 -0700
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 18 +++-
...niteDynamicCacheWithConfigStartSelfTest.java | 98 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 1 +
5 files changed, 116 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 2049d03..c3f3e7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -220,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1ae4ae7..af121c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -249,7 +249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -276,7 +276,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (cctx.rebalanceEnabled()) {
for (int p = 0; p < num; p++) {
// If this is the first node in grid.
- boolean added = exchFut.isCacheAdded(cctx.cacheId());
+ boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
assert exchId.isJoined() || added;
@@ -668,7 +668,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
- ", allIds=" + allIds + ", node2part=" + node2part + ']';
+ ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db43c6c..a03e2e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -295,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cacheId Cache ID to check.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId) {
+ public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
@@ -305,7 +305,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
- return false;
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
}
/**
@@ -505,11 +507,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx.isLocal())
continue;
- cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
-
GridDhtPartitionTopology top = cacheCtx.topology();
top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+ if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+ initTopology(cacheCtx);
+
+ top.beforeExchange(this);
+ }
+ else
+ cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
}
if (exchId.isLeft())
@@ -566,7 +574,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert oldestNode.get() != null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (isCacheAdded(cacheCtx.cacheId())) {
+ if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..dcd6a69
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "partitioned";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ if (client)
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, String.class);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnClient() throws Exception {
+ int srvCnt = 3;
+
+ startGrids(srvCnt);
+
+ try {
+ client = true;
+
+ IgniteEx client = startGrid(srvCnt);
+
+ for (int i = 0; i < 100; i++)
+ client.cache(CACHE_NAME).put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assertEquals(i, grid(0).cache(CACHE_NAME).get(i));
+
+ client.cache(CACHE_NAME).removeAll();
+
+ for (int i = 0; i < 100; i++)
+ assertNull(grid(0).cache(CACHE_NAME).get(i));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index a8019d2..15756d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -99,6 +99,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
+ suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class);
suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
[21/29] incubator-ignite git commit: Merge branches 'ignite-983' and
'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite
into ignite-983
Posted by ag...@apache.org.
Merge branches 'ignite-983' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-983
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46b24472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46b24472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46b24472
Branch: refs/heads/ignite-389-ipc
Commit: 46b244723a0c39c7bb5bb92157d71032d655923a
Parents: 51d4737 922c1c4
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:29:43 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:29:43 2015 +0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../processors/cache/GridCacheContext.java | 3 -
.../dht/GridDhtPartitionTopologyImpl.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 10 +-
.../internal/visor/query/VisorQueryJob.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 -----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 56 +++++++-
.../tcp/ipfinder/TcpDiscoveryIpFinder.java | 10 +-
.../TcpDiscoveryMulticastIpFinder.java | 47 +++++--
.../cache/IgniteDynamicCacheStartSelfTest.java | 62 +++++++++
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 43 ++++---
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
15 files changed, 326 insertions(+), 84 deletions(-)
----------------------------------------------------------------------