You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:03 UTC
[71/92] [abbrv] ignite git commit: WIP.
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunication.java
deleted file mode 100644
index 20f780c..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ /dev/null
@@ -1,1460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SocketChannel;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridKeyLock;
-import org.apache.ignite.internal.util.ipc.IpcEndpoint;
-import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryClientEndpoint;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
-import org.apache.ignite.internal.util.nio.GridBufferedParser;
-import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
-import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.nio.GridNioServerListener;
-import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
-
-/**
- * Hadoop external communication class.
- */
-public class HadoopExternalCommunication {
- /** IPC error message. */
- public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
- "(switching to TCP, may be slower).";
-
- /** Default port which node sets listener to (value is <tt>47100</tt>). */
- public static final int DFLT_PORT = 27100;
-
- /** Default connection timeout (value is <tt>1000</tt>ms). */
- public static final long DFLT_CONN_TIMEOUT = 1000;
-
- /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
- public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
-
- /** Default reconnect attempts count (value is <tt>10</tt>). */
- public static final int DFLT_RECONNECT_CNT = 10;
-
- /** Default message queue limit per connection (for incoming and outgoing . */
- public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
-
- /**
- * Default count of selectors for TCP server equals to
- * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
- */
- public static final int DFLT_SELECTORS_CNT = 1;
-
- /** Node ID meta for session. */
- private static final int PROCESS_META = GridNioSessionMetaKey.nextUniqueKey();
-
- /** Handshake timeout meta for session. */
- private static final int HANDSHAKE_FINISH_META = GridNioSessionMetaKey.nextUniqueKey();
-
- /** Message tracker meta for session. */
- private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
-
- /**
- * Default local port range (value is <tt>100</tt>).
- * See {@link #setLocalPortRange(int)} for details.
- */
- public static final int DFLT_PORT_RANGE = 100;
-
- /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
- public static final boolean DFLT_TCP_NODELAY = true;
-
- /** Server listener. */
- private final GridNioServerListener<HadoopMessage> srvLsnr =
- new GridNioServerListenerAdapter<HadoopMessage>() {
- @Override public void onConnected(GridNioSession ses) {
- HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
- assert desc != null : "Received connected notification without finished handshake: " + ses;
- }
-
- /** {@inheritDoc} */
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (log.isDebugEnabled())
- log.debug("Closed connection for session: " + ses);
-
- if (e != null)
- U.error(log, "Session disconnected due to exception: " + ses, e);
-
- HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
- if (desc != null) {
- HadoopCommunicationClient rmv = clients.remove(desc.processId());
-
- if (rmv != null)
- rmv.forceClose();
- }
-
- HadoopMessageListener lsnr0 = lsnr;
-
- if (lsnr0 != null)
- // Notify listener about connection close.
- lsnr0.onConnectionLost(desc);
- }
-
- /** {@inheritDoc} */
- @Override public void onMessage(GridNioSession ses, HadoopMessage msg) {
- notifyListener(ses.<HadoopProcessDescriptor>meta(PROCESS_META), msg);
-
- if (msgQueueLimit > 0) {
- GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
- assert tracker != null : "Missing tracker for limited message queue: " + ses;
-
- tracker.run();
- }
- }
- };
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Local process descriptor. */
- private HadoopProcessDescriptor locProcDesc;
-
- /** Marshaller. */
- private Marshaller marsh;
-
- /** Message notification executor service. */
- private ExecutorService execSvc;
-
- /** Grid name. */
- private String gridName;
-
- /** Complex variable that represents this node IP address. */
- private volatile InetAddress locHost;
-
- /** Local port which node uses. */
- private int locPort = DFLT_PORT;
-
- /** Local port range. */
- private int locPortRange = DFLT_PORT_RANGE;
-
- /** Local port which node uses to accept shared memory connections. */
- private int shmemPort = -1;
-
- /** Allocate direct buffer or heap buffer. */
- private boolean directBuf = true;
-
- /** Connect timeout. */
- private long connTimeout = DFLT_CONN_TIMEOUT;
-
- /** Maximum connect timeout. */
- private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
-
- /** Reconnect attempts count. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private int reconCnt = DFLT_RECONNECT_CNT;
-
- /** Socket send buffer. */
- private int sockSndBuf;
-
- /** Socket receive buffer. */
- private int sockRcvBuf;
-
- /** Message queue limit. */
- private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
-
- /** NIO server. */
- private GridNioServer<HadoopMessage> nioSrvr;
-
- /** Shared memory server. */
- private IpcSharedMemoryServerEndpoint shmemSrv;
-
- /** {@code TCP_NODELAY} option value for created sockets. */
- private boolean tcpNoDelay = DFLT_TCP_NODELAY;
-
- /** Shared memory accept worker. */
- private ShmemAcceptWorker shmemAcceptWorker;
-
- /** Shared memory workers. */
- private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
-
- /** Clients. */
- private final ConcurrentMap<UUID, HadoopCommunicationClient> clients = GridConcurrentFactory.newMap();
-
- /** Message listener. */
- private volatile HadoopMessageListener lsnr;
-
- /** Bound port. */
- private int boundTcpPort = -1;
-
- /** Bound port for shared memory server. */
- private int boundTcpShmemPort = -1;
-
- /** Count of selectors to use in TCP server. */
- private int selectorsCnt = DFLT_SELECTORS_CNT;
-
- /** Local node ID message. */
- private ProcessHandshakeMessage locIdMsg;
-
- /** Locks. */
- private final GridKeyLock locks = new GridKeyLock();
-
- /**
- * @param parentNodeId Parent node ID.
- * @param procId Process ID.
- * @param marsh Marshaller to use.
- * @param log Logger.
- * @param execSvc Executor service for message notification.
- * @param gridName Grid name.
- */
- public HadoopExternalCommunication(
- UUID parentNodeId,
- UUID procId,
- Marshaller marsh,
- IgniteLogger log,
- ExecutorService execSvc,
- String gridName
- ) {
- locProcDesc = new HadoopProcessDescriptor(parentNodeId, procId);
-
- this.marsh = marsh;
- this.log = log.getLogger(HadoopExternalCommunication.class);
- this.execSvc = execSvc;
- this.gridName = gridName;
- }
-
- /**
- * Sets local port for socket binding.
- * <p>
- * If not provided, default value is {@link #DFLT_PORT}.
- *
- * @param locPort Port number.
- */
- public void setLocalPort(int locPort) {
- this.locPort = locPort;
- }
-
- /**
- * Gets local port for socket binding.
- *
- * @return Local port.
- */
- public int getLocalPort() {
- return locPort;
- }
-
- /**
- * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>).
- * If provided local port (see {@link #setLocalPort(int)}} is occupied,
- * implementation will try to increment the port number for as long as it is less than
- * initial value plus this range.
- * <p>
- * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by
- * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed.
- * <p>
- * Local port range is very useful during development when more than one grid nodes need to run
- * on the same physical machine.
- * <p>
- * If not provided, default value is {@link #DFLT_PORT_RANGE}.
- *
- * @param locPortRange New local port range.
- */
- public void setLocalPortRange(int locPortRange) {
- this.locPortRange = locPortRange;
- }
-
- /**
- * @return Local port range.
- */
- public int getLocalPortRange() {
- return locPortRange;
- }
-
- /**
- * Sets local port to accept shared memory connections.
- * <p>
- * If set to {@code -1} shared memory communication will be disabled.
- * <p>
- * If not provided, shared memory is disabled.
- *
- * @param shmemPort Port number.
- */
- public void setSharedMemoryPort(int shmemPort) {
- this.shmemPort = shmemPort;
- }
-
- /**
- * Gets shared memory port to accept incoming connections.
- *
- * @return Shared memory port.
- */
- public int getSharedMemoryPort() {
- return shmemPort;
- }
-
- /**
- * Sets connect timeout used when establishing connection
- * with remote nodes.
- * <p>
- * {@code 0} is interpreted as infinite timeout.
- * <p>
- * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
- *
- * @param connTimeout Connect timeout.
- */
- public void setConnectTimeout(long connTimeout) {
- this.connTimeout = connTimeout;
- }
-
- /**
- * @return Connection timeout.
- */
- public long getConnectTimeout() {
- return connTimeout;
- }
-
- /**
- * Sets maximum connect timeout. If handshake is not established within connect timeout,
- * then SPI tries to repeat handshake procedure with increased connect timeout.
- * Connect timeout can grow till maximum timeout value,
- * if maximum timeout value is reached then the handshake is considered as failed.
- * <p>
- * {@code 0} is interpreted as infinite timeout.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
- *
- * @param maxConnTimeout Maximum connect timeout.
- */
- public void setMaxConnectTimeout(long maxConnTimeout) {
- this.maxConnTimeout = maxConnTimeout;
- }
-
- /**
- * Gets maximum connection timeout.
- *
- * @return Maximum connection timeout.
- */
- public long getMaxConnectTimeout() {
- return maxConnTimeout;
- }
-
- /**
- * Sets maximum number of reconnect attempts used when establishing connection
- * with remote nodes.
- * <p>
- * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
- *
- * @param reconCnt Maximum number of reconnection attempts.
- */
- public void setReconnectCount(int reconCnt) {
- this.reconCnt = reconCnt;
- }
-
- /**
- * @return Reconnect count.
- */
- public int getReconnectCount() {
- return reconCnt;
- }
-
- /**
- * Sets flag to allocate direct or heap buffer in SPI.
- * If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call.
- * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
- * <p>
- * If not provided, default value is {@code true}.
- *
- * @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
- */
- public void setDirectBuffer(boolean directBuf) {
- this.directBuf = directBuf;
- }
-
- /**
- * @return Direct buffer flag.
- */
- public boolean isDirectBuffer() {
- return directBuf;
- }
-
- /**
- * Sets the count of selectors te be used in TCP server.
- * <p/>
- * If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
- *
- * @param selectorsCnt Selectors count.
- */
- public void setSelectorsCount(int selectorsCnt) {
- this.selectorsCnt = selectorsCnt;
- }
-
- /**
- * @return Number of selectors to use.
- */
- public int getSelectorsCount() {
- return selectorsCnt;
- }
-
- /**
- * Sets value for {@code TCP_NODELAY} socket option. Each
- * socket will be opened using provided value.
- * <p>
- * Setting this option to {@code true} disables Nagle's algorithm
- * for socket decreasing latency and delivery time for small messages.
- * <p>
- * For systems that work under heavy network load it is advisable to
- * set this value to {@code false}.
- * <p>
- * If not provided, default value is {@link #DFLT_TCP_NODELAY}.
- *
- * @param tcpNoDelay {@code True} to disable TCP delay.
- */
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- /**
- * @return {@code TCP_NO_DELAY} flag.
- */
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- /**
- * Sets receive buffer size for sockets created or accepted by this SPI.
- * <p>
- * If not provided, default is {@code 0} which leaves buffer unchanged after
- * socket creation (OS defaults).
- *
- * @param sockRcvBuf Socket receive buffer size.
- */
- public void setSocketReceiveBuffer(int sockRcvBuf) {
- this.sockRcvBuf = sockRcvBuf;
- }
-
- /**
- * @return Socket receive buffer size.
- */
- public int getSocketReceiveBuffer() {
- return sockRcvBuf;
- }
-
- /**
- * Sets send buffer size for sockets created or accepted by this SPI.
- * <p>
- * If not provided, default is {@code 0} which leaves the buffer unchanged
- * after socket creation (OS defaults).
- *
- * @param sockSndBuf Socket send buffer size.
- */
- public void setSocketSendBuffer(int sockSndBuf) {
- this.sockSndBuf = sockSndBuf;
- }
-
- /**
- * @return Socket send buffer size.
- */
- public int getSocketSendBuffer() {
- return sockSndBuf;
- }
-
- /**
- * Sets message queue limit for incoming and outgoing messages.
- * <p>
- * When set to positive number send queue is limited to the configured value.
- * {@code 0} disables the size limitations.
- * <p>
- * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
- *
- * @param msgQueueLimit Send queue size limit.
- */
- public void setMessageQueueLimit(int msgQueueLimit) {
- this.msgQueueLimit = msgQueueLimit;
- }
-
- /**
- * @return Message queue size limit.
- */
- public int getMessageQueueLimit() {
- return msgQueueLimit;
- }
-
- /**
- * Sets Hadoop communication message listener.
- *
- * @param lsnr Message listener.
- */
- public void setListener(HadoopMessageListener lsnr) {
- this.lsnr = lsnr;
- }
-
- /**
- * @return Outbound message queue size.
- */
- public int getOutboundMessagesQueueSize() {
- return nioSrvr.outboundMessagesQueueSize();
- }
-
- /**
- * Starts communication.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void start() throws IgniteCheckedException {
- try {
- locHost = U.getLocalHost();
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to initialize local address.", e);
- }
-
- try {
- shmemSrv = resetShmemServer();
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to start shared memory communication server.", e);
- }
-
- try {
- // This method potentially resets local port to the value
- // local node was bound to.
- nioSrvr = resetNioServer();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to initialize TCP server: " + locHost, e);
- }
-
- locProcDesc.address(locHost.getHostAddress());
- locProcDesc.sharedMemoryPort(boundTcpShmemPort);
- locProcDesc.tcpPort(boundTcpPort);
-
- locIdMsg = new ProcessHandshakeMessage(locProcDesc);
-
- if (shmemSrv != null) {
- shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
-
- new IgniteThread(shmemAcceptWorker).start();
- }
-
- nioSrvr.start();
- }
-
- /**
- * Gets local process descriptor.
- *
- * @return Local process descriptor.
- */
- public HadoopProcessDescriptor localProcessDescriptor() {
- return locProcDesc;
- }
-
- /**
- * Gets filters used by communication.
- *
- * @return Filters array.
- */
- private GridNioFilter[] filters() {
- return new GridNioFilter[] {
- new GridNioAsyncNotifyFilter(gridName, execSvc, log),
- new HandshakeAndBackpressureFilter(),
- new HadoopMarshallerFilter(marsh),
- new GridNioCodecFilter(new GridBufferedParser(directBuf, ByteOrder.nativeOrder()), log, false)
- };
- }
-
- /**
- * Recreates tpcSrvr socket instance.
- *
- * @return Server instance.
- * @throws IgniteCheckedException Thrown if it's not possible to create server.
- */
- private GridNioServer<HadoopMessage> resetNioServer() throws IgniteCheckedException {
- if (boundTcpPort >= 0)
- throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);
-
- IgniteCheckedException lastEx = null;
-
- // If configured TCP port is busy, find first available in range.
- for (int port = locPort; port < locPort + locPortRange; port++) {
- try {
- GridNioServer<HadoopMessage> srvr =
- GridNioServer.<HadoopMessage>builder()
- .address(locHost)
- .port(port)
- .listener(srvLsnr)
- .logger(log.getLogger(GridNioServer.class))
- .selectorCount(selectorsCnt)
- .gridName(gridName)
- .tcpNoDelay(tcpNoDelay)
- .directBuffer(directBuf)
- .byteOrder(ByteOrder.nativeOrder())
- .socketSendBufferSize(sockSndBuf)
- .socketReceiveBufferSize(sockRcvBuf)
- .sendQueueLimit(msgQueueLimit)
- .directMode(false)
- .filters(filters())
- .build();
-
- boundTcpPort = port;
-
- // Ack Port the TCP server was bound to.
- if (log.isInfoEnabled())
- log.info("Successfully bound to TCP port [port=" + boundTcpPort +
- ", locHost=" + locHost + ']');
-
- return srvr;
- }
- catch (IgniteCheckedException e) {
- lastEx = e;
-
- if (log.isDebugEnabled())
- log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
- ", locHost=" + locHost + ']');
- }
- }
-
- // If free port wasn't found.
- throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort +
- ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
- }
-
- /**
- * Creates new shared memory communication server.
- * @return Server.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
- if (boundTcpShmemPort >= 0)
- throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
-
- if (shmemPort == -1 || U.isWindows())
- return null;
-
- IgniteCheckedException lastEx = null;
-
- // If configured TCP port is busy, find first available in range.
- for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
- try {
- IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(
- log.getLogger(IpcSharedMemoryServerEndpoint.class),
- locProcDesc.processId(), gridName);
-
- srv.setPort(port);
-
- srv.omitOutOfResourcesWarning(true);
-
- srv.start();
-
- boundTcpShmemPort = port;
-
- // Ack Port the TCP server was bound to.
- if (log.isInfoEnabled())
- log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
- ", locHost=" + locHost + ']');
-
- return srv;
- }
- catch (IgniteCheckedException e) {
- lastEx = e;
-
- if (log.isDebugEnabled())
- log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
- ", locHost=" + locHost + ']');
- }
- }
-
- // If free port wasn't found.
- throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
- locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
- }
-
- /**
- * Stops the server.
- *
- * @throws IgniteCheckedException
- */
- public void stop() throws IgniteCheckedException {
- // Stop TCP server.
- if (nioSrvr != null)
- nioSrvr.stop();
-
- U.cancel(shmemAcceptWorker);
- U.join(shmemAcceptWorker, log);
-
- U.cancel(shmemWorkers);
- U.join(shmemWorkers, log);
-
- shmemWorkers.clear();
-
- // Force closing on stop (safety).
- for (HadoopCommunicationClient client : clients.values())
- client.forceClose();
-
- // Clear resources.
- nioSrvr = null;
-
- boundTcpPort = -1;
- }
-
- /**
- * Sends message to Hadoop process.
- *
- * @param desc
- * @param msg
- * @throws IgniteCheckedException
- */
- public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws
- IgniteCheckedException {
- assert desc != null;
- assert msg != null;
-
- if (log.isTraceEnabled())
- log.trace("Sending message to Hadoop process [desc=" + desc + ", msg=" + msg + ']');
-
- HadoopCommunicationClient client = null;
-
- boolean closeOnRelease = true;
-
- try {
- client = reserveClient(desc);
-
- client.sendMessage(desc, msg);
-
- closeOnRelease = false;
- }
- finally {
- if (client != null) {
- if (closeOnRelease) {
- client.forceClose();
-
- clients.remove(desc.processId(), client);
- }
- else
- client.release();
- }
- }
- }
-
- /**
- * Returns existing or just created client to node.
- *
- * @param desc Node to which client should be open.
- * @return The existing or just created client.
- * @throws IgniteCheckedException Thrown if any exception occurs.
- */
- private HadoopCommunicationClient reserveClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
- assert desc != null;
-
- UUID procId = desc.processId();
-
- while (true) {
- HadoopCommunicationClient client = clients.get(procId);
-
- if (client == null) {
- if (log.isDebugEnabled())
- log.debug("Did not find client for remote process [locProcDesc=" + locProcDesc + ", desc=" +
- desc + ']');
-
- // Do not allow concurrent connects.
- Object sync = locks.lock(procId);
-
- try {
- client = clients.get(procId);
-
- if (client == null) {
- HadoopCommunicationClient old = clients.put(procId, client = createNioClient(desc));
-
- assert old == null;
- }
- }
- finally {
- locks.unlock(procId, sync);
- }
-
- assert client != null;
- }
-
- if (client.reserve())
- return client;
- else
- // Client has just been closed by idle worker. Help it and try again.
- clients.remove(procId, client);
- }
- }
-
- /**
- * @param desc Process descriptor.
- * @return Client.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable protected HadoopCommunicationClient createNioClient(HadoopProcessDescriptor desc)
- throws IgniteCheckedException {
- assert desc != null;
-
- int shmemPort = desc.sharedMemoryPort();
-
- // If remote node has shared memory server enabled and has the same set of MACs
- // then we are likely to run on the same host and shared memory communication could be tried.
- if (shmemPort != -1 && locProcDesc.parentNodeId().equals(desc.parentNodeId())) {
- try {
- return createShmemClient(desc, shmemPort);
- }
- catch (IgniteCheckedException e) {
- if (e.hasCause(IpcOutOfSystemResourcesException.class))
- // Has cause or is itself the IpcOutOfSystemResourcesException.
- LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
- else if (log.isDebugEnabled())
- log.debug("Failed to establish shared memory connection with local hadoop process: " +
- desc);
- }
- }
-
- return createTcpClient(desc);
- }
-
- /**
- * @param desc Process descriptor.
- * @param port Port.
- * @return Client.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable protected HadoopCommunicationClient createShmemClient(HadoopProcessDescriptor desc, int port)
- throws IgniteCheckedException {
- int attempt = 1;
-
- int connectAttempts = 1;
-
- long connTimeout0 = connTimeout;
-
- while (true) {
- IpcEndpoint clientEndpoint;
-
- try {
- clientEndpoint = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
- }
- catch (IgniteCheckedException e) {
- // Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
- connectAttempts++;
-
- continue;
- }
-
- throw e;
- }
-
- HadoopCommunicationClient client = null;
-
- try {
- ShmemWorker worker = new ShmemWorker(clientEndpoint, false);
-
- shmemWorkers.add(worker);
-
- GridNioSession ses = worker.session();
-
- HandshakeFinish fin = new HandshakeFinish();
-
- // We are in lock, it is safe to get session and attach
- ses.addMeta(HANDSHAKE_FINISH_META, fin);
-
- client = new HadoopTcpNioCommunicationClient(ses);
-
- new IgniteThread(worker).start();
-
- fin.await(connTimeout0);
- }
- catch (HadoopHandshakeTimeoutException e) {
- if (log.isDebugEnabled())
- log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", err=" + e.getMessage() + ", client=" + client + ']');
-
- if (client != null)
- client.forceClose();
-
- if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
- if (log.isDebugEnabled())
- log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
- "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
- ", attempt=" + attempt + ", reconCnt=" + reconCnt +
- ", err=" + e.getMessage() + ", client=" + client + ']');
-
- throw e;
- }
- else {
- attempt++;
-
- connTimeout0 *= 2;
-
- continue;
- }
- }
- catch (RuntimeException | Error e) {
- if (log.isDebugEnabled())
- log.debug(
- "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
-
- if (client != null)
- client.forceClose();
-
- throw e;
- }
-
- return client;
- }
- }
-
- /**
- * Establish TCP connection to remote hadoop process and returns client.
- *
- * @param desc Process descriptor.
- * @return Client.
- * @throws IgniteCheckedException If failed.
- */
- protected HadoopCommunicationClient createTcpClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
- String addr = desc.address();
-
- int port = desc.tcpPort();
-
- if (log.isDebugEnabled())
- log.debug("Trying to connect to remote process [locProcDesc=" + locProcDesc + ", desc=" + desc + ']');
-
- boolean conn = false;
- HadoopTcpNioCommunicationClient client = null;
- IgniteCheckedException errs = null;
-
- int connectAttempts = 1;
-
- long connTimeout0 = connTimeout;
-
- int attempt = 1;
-
- while (!conn) { // Reconnection on handshake timeout.
- try {
- SocketChannel ch = SocketChannel.open();
-
- ch.configureBlocking(true);
-
- ch.socket().setTcpNoDelay(tcpNoDelay);
- ch.socket().setKeepAlive(true);
-
- if (sockRcvBuf > 0)
- ch.socket().setReceiveBufferSize(sockRcvBuf);
-
- if (sockSndBuf > 0)
- ch.socket().setSendBufferSize(sockSndBuf);
-
- ch.socket().connect(new InetSocketAddress(addr, port), (int)connTimeout);
-
- HandshakeFinish fin = new HandshakeFinish();
-
- GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
-
- client = new HadoopTcpNioCommunicationClient(ses);
-
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake finish for client: " + client);
-
- fin.await(connTimeout0);
-
- conn = true;
- }
- catch (HadoopHandshakeTimeoutException e) {
- if (client != null) {
- client.forceClose();
-
- client = null;
- }
-
- if (log.isDebugEnabled())
- log.debug(
- "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", desc=" + desc + ", port=" + port + ", err=" + e + ']');
-
- if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
- if (log.isDebugEnabled())
- log.debug("Handshake timed out (will stop attempts to perform the handshake) " +
- "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
- ", attempt=" + attempt + ", reconCnt=" + reconCnt +
- ", err=" + e.getMessage() + ", addr=" + addr + ']');
-
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to remote Hadoop process " +
- "(is process still running?) [desc=" + desc + ", addrs=" + addr + ']');
-
- errs.addSuppressed(e);
-
- break;
- }
- else {
- attempt++;
-
- connTimeout0 *= 2;
-
- // Continue loop.
- }
- }
- catch (Exception e) {
- if (client != null) {
- client.forceClose();
-
- client = null;
- }
-
- if (log.isDebugEnabled())
- log.debug("Client creation failed [addr=" + addr + ", port=" + port +
- ", err=" + e + ']');
-
- if (X.hasCause(e, SocketTimeoutException.class))
- LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ", port=" + port + ']');
-
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to remote Hadoop process (is process still running?) " +
- "[desc=" + desc + ", addrs=" + addr + ']');
-
- errs.addSuppressed(e);
-
- // Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2 &&
- (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
- connectAttempts++;
-
- continue;
- }
-
- break;
- }
- }
-
- if (client == null) {
- assert errs != null;
-
- if (X.hasCause(errs, ConnectException.class))
- LT.warn(log, null, "Failed to connect to a remote Hadoop process (is process still running?). " +
- "Make sure operating system firewall is disabled on local and remote host) " +
- "[addrs=" + addr + ", port=" + port + ']');
-
- throw errs;
- }
-
- if (log.isDebugEnabled())
- log.debug("Created client: " + client);
-
- return client;
- }
-
- /**
- * @param desc Sender process descriptor.
- * @param msg Communication message.
- */
- protected void notifyListener(HadoopProcessDescriptor desc, HadoopMessage msg) {
- HadoopMessageListener lsnr = this.lsnr;
-
- if (lsnr != null)
- // Notify listener of a new message.
- lsnr.onMessageReceived(desc, msg);
- else if (log.isDebugEnabled())
- log.debug("Received communication message without any registered listeners (will ignore) " +
- "[senderProcDesc=" + desc + ", msg=" + msg + ']');
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopExternalCommunication.class, this);
- }
-
- /**
- * This worker takes responsibility to shut the server down when stopping,
- * No other thread shall stop passed server.
- */
- private class ShmemAcceptWorker extends GridWorker {
- /** */
- private final IpcSharedMemoryServerEndpoint srv;
-
- /**
- * @param srv Server.
- */
- ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
- super(gridName, "shmem-communication-acceptor", HadoopExternalCommunication.this.log);
-
- this.srv = srv;
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- try {
- while (!Thread.interrupted()) {
- ShmemWorker e = new ShmemWorker(srv.accept(), true);
-
- shmemWorkers.add(e);
-
- new IgniteThread(e).start();
- }
- }
- catch (IgniteCheckedException e) {
- if (!isCancelled())
- U.error(log, "Shmem server failed.", e);
- }
- finally {
- srv.close();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- super.cancel();
-
- srv.close();
- }
- }
-
- /**
- *
- */
- private class ShmemWorker extends GridWorker {
- /** */
- private final IpcEndpoint endpoint;
-
- /** Adapter. */
- private HadoopIpcToNioAdapter<HadoopMessage> adapter;
-
- /**
- * @param endpoint Endpoint.
- */
- private ShmemWorker(IpcEndpoint endpoint, boolean accepted) {
- super(gridName, "shmem-worker", HadoopExternalCommunication.this.log);
-
- this.endpoint = endpoint;
-
- adapter = new HadoopIpcToNioAdapter<>(
- HadoopExternalCommunication.this.log,
- endpoint,
- accepted,
- srvLsnr,
- filters());
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- try {
- adapter.serve();
- }
- finally {
- shmemWorkers.remove(this);
-
- endpoint.close();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- super.cancel();
-
- endpoint.close();
- }
-
- /** @{@inheritDoc} */
- @Override protected void cleanup() {
- super.cleanup();
-
- endpoint.close();
- }
-
- /** @{@inheritDoc} */
- @Override public String toString() {
- return S.toString(ShmemWorker.class, this);
- }
-
- /**
- * @return NIO session for this worker.
- */
- public GridNioSession session() {
- return adapter.session();
- }
- }
-
- /**
- *
- */
- private static class HandshakeFinish {
- /** Await latch. */
- private CountDownLatch latch = new CountDownLatch(1);
-
- /**
- * Finishes handshake.
- */
- public void finish() {
- latch.countDown();
- }
-
- /**
- * @param time Time to wait.
- * @throws HadoopHandshakeTimeoutException If failed to wait.
- */
- public void await(long time) throws HadoopHandshakeTimeoutException {
- try {
- if (!latch.await(time, TimeUnit.MILLISECONDS))
- throw new HadoopHandshakeTimeoutException("Failed to wait for handshake to finish [timeout=" +
- time + ']');
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new HadoopHandshakeTimeoutException("Failed to wait for handshake to finish (thread was " +
- "interrupted) [timeout=" + time + ']', e);
- }
- }
- }
-
- /**
- *
- */
- private class HandshakeAndBackpressureFilter extends GridNioFilterAdapter {
- /**
- * Assigns filter name to a filter.
- */
- protected HandshakeAndBackpressureFilter() {
- super("HadoopHandshakeFilter");
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionOpened(final GridNioSession ses) throws IgniteCheckedException {
- if (ses.accepted()) {
- if (log.isDebugEnabled())
- log.debug("Accepted connection, initiating handshake: " + ses);
-
- // Server initiates handshake.
- ses.send(locIdMsg).listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- try {
- // Make sure there were no errors.
- fut.get();
- }
- catch (IgniteCheckedException e) {
- log.warning("Failed to send handshake message, will close session: " + ses, e);
-
- ses.close();
- }
- }
- });
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionClosed(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
- proceedExceptionCaught(ses, ex);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage))
- log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']');
-
- return proceedSessionWrite(ses, msg);
- }
-
- /** {@inheritDoc} */
- @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
- HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
- UUID rmtProcId = desc == null ? null : desc.processId();
-
- if (rmtProcId == null) {
- if (!(msg instanceof ProcessHandshakeMessage)) {
- log.warning("Invalid handshake message received, will close connection [ses=" + ses +
- ", msg=" + msg + ']');
-
- ses.close();
-
- return;
- }
-
- ProcessHandshakeMessage nId = (ProcessHandshakeMessage)msg;
-
- if (log.isDebugEnabled())
- log.debug("Received handshake message [ses=" + ses + ", msg=" + msg + ']');
-
- ses.addMeta(PROCESS_META, nId.processDescriptor());
-
- if (!ses.accepted())
- // Send handshake reply.
- ses.send(locIdMsg);
- else {
- //
- rmtProcId = nId.processDescriptor().processId();
-
- if (log.isDebugEnabled())
- log.debug("Finished handshake with remote client: " + ses);
-
- Object sync = locks.tryLock(rmtProcId);
-
- if (sync != null) {
- try {
- if (clients.get(rmtProcId) == null) {
- if (log.isDebugEnabled())
- log.debug("Will reuse session for descriptor: " + rmtProcId);
-
- // Handshake finished flag is true.
- clients.put(rmtProcId, new HadoopTcpNioCommunicationClient(ses));
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Will not reuse client as another already exists [locProcDesc=" +
- locProcDesc + ", desc=" + desc + ']');
- }
- }
- finally {
- locks.unlock(rmtProcId, sync);
- }
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Concurrent connection is being established, will not reuse client session [" +
- "locProcDesc=" + locProcDesc + ", desc=" + desc + ']');
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Handshake is finished for session [ses=" + ses + ", locProcDesc=" + locProcDesc + ']');
-
- HandshakeFinish to = ses.meta(HANDSHAKE_FINISH_META);
-
- if (to != null)
- to.finish();
-
- // Notify session opened (both parties).
- proceedSessionOpened(ses);
- }
- else {
- if (msgQueueLimit > 0) {
- GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
- if (tracker == null) {
- GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
- new GridNioMessageTracker(ses, msgQueueLimit));
-
- assert old == null;
- }
-
- tracker.onMessageReceived();
- }
-
- proceedMessageReceived(ses, msg);
- }
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws IgniteCheckedException {
- return proceedSessionClose(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionIdleTimeout(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionWriteTimeout(ses);
- }
- }
-
- /**
- * Process ID message.
- */
- @SuppressWarnings("PublicInnerClass")
- public static class ProcessHandshakeMessage implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Node ID. */
- private HadoopProcessDescriptor procDesc;
-
- /** */
- public ProcessHandshakeMessage() {
- // No-op.
- }
-
- /**
- * @param procDesc Process descriptor.
- */
- private ProcessHandshakeMessage(HadoopProcessDescriptor procDesc) {
- this.procDesc = procDesc;
- }
-
- /**
- * @return Process ID.
- */
- public HadoopProcessDescriptor processDescriptor() {
- return procDesc;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(procDesc);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- procDesc = (HadoopProcessDescriptor)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ProcessHandshakeMessage.class, this);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
deleted file mode 100644
index 5f92cef..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/** Internal exception class for proper timeout handling. */
-class HadoopHandshakeTimeoutException extends IgniteCheckedException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param msg Message.
- */
- HadoopHandshakeTimeoutException(String msg) {
- super(msg);
- }
-
- /**
- * @param msg Message.
- * @param cause Cause.
- */
- HadoopHandshakeTimeoutException(String msg, @Nullable Throwable cause) {
- super(msg, cause);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopIpcToNioAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
deleted file mode 100644
index 6f8be77..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.ipc.IpcEndpoint;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFilterChain;
-import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioServerListener;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
-
-/**
- * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
- * communications.
- *
- * Note that this class consumes an entire thread inside {@link #serve()} method
- * in order to serve one {@link org.apache.ignite.internal.util.ipc.IpcEndpoint}.
- */
-public class HadoopIpcToNioAdapter<T> {
- /** */
- private final IpcEndpoint endp;
-
- /** */
- private final GridNioFilterChain<T> chain;
-
- /** */
- private final GridNioSessionImpl ses;
-
- /** */
- private final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
-
- /** */
- private final ByteBuffer writeBuf;
-
- /**
- * @param log Log.
- * @param endp Endpoint.
- * @param lsnr Listener.
- * @param filters Filters.
- */
- public HadoopIpcToNioAdapter(IgniteLogger log, IpcEndpoint endp, boolean accepted,
- GridNioServerListener<T> lsnr, GridNioFilter... filters) {
- this.endp = endp;
-
- chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
- ses = new GridNioSessionImpl(chain, null, null, accepted);
-
- writeBuf = ByteBuffer.allocate(8 << 10);
-
- writeBuf.order(ByteOrder.nativeOrder());
- }
-
- /**
- * Serves given set of listeners repeatedly reading data from the endpoint.
- *
- * @throws InterruptedException If interrupted.
- */
- public void serve() throws InterruptedException {
- try {
- chain.onSessionOpened(ses);
-
- InputStream in = endp.inputStream();
-
- ByteBuffer readBuf = ByteBuffer.allocate(8 << 10);
-
- readBuf.order(ByteOrder.nativeOrder());
-
- assert readBuf.hasArray();
-
- while (!Thread.interrupted()) {
- int pos = readBuf.position();
-
- int read = in.read(readBuf.array(), pos, readBuf.remaining());
-
- if (read > 0) {
- readBuf.position(0);
- readBuf.limit(pos + read);
-
- chain.onMessageReceived(ses, readBuf);
-
- if (readBuf.hasRemaining())
- readBuf.compact();
- else
- readBuf.clear();
-
- CountDownLatch latch = latchRef.get();
-
- if (latch != null)
- latch.await();
- }
- else if (read < 0) {
- endp.close();
-
- break; // And close below.
- }
- }
-
- // Assuming remote end closed connection - pushing event from head to tail.
- chain.onSessionClosed(ses);
- }
- catch (Exception e) {
- chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to read from IPC endpoint.", e));
- }
- }
-
- /**
- * Gets dummy session for this adapter.
- *
- * @return Session.
- */
- public GridNioSession session() {
- return ses;
- }
-
- /**
- * Handles write events on chain.
- *
- * @param msg Buffer to send.
- * @return Send result.
- */
- private GridNioFuture<?> send(ByteBuffer msg) {
- assert writeBuf.hasArray();
-
- try {
- while (msg.hasRemaining()) {
- writeBuf.clear();
-
- writeBuf.put(msg);
-
- endp.outputStream().write(writeBuf.array(), 0, writeBuf.position());
- }
- }
- catch (IOException | IgniteCheckedException e) {
- return new GridNioFinishedFuture<Object>(e);
- }
-
- return new GridNioFinishedFuture<>((Object)null);
- }
-
- /**
- * Filter forwarding messages from chain's head to this server.
- */
- private class HeadFilter extends GridNioFilterAdapter {
- /**
- * Assigns filter name.
- */
- protected HeadFilter() {
- super("HeadFilter");
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionOpened(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionClosed(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
- proceedExceptionCaught(ses, ex);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
- assert ses == HadoopIpcToNioAdapter.this.ses : "ses=" + ses +
- ", this.ses=" + HadoopIpcToNioAdapter.this.ses;
-
- return send((ByteBuffer)msg);
- }
-
- /** {@inheritDoc} */
- @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
- proceedMessageReceived(ses, msg);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException {
- // This call should be synced externally to avoid races.
- boolean b = latchRef.compareAndSet(null, new CountDownLatch(1));
-
- assert b;
-
- return new GridNioFinishedFuture<>(b);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException {
- // This call should be synced externally to avoid races.
- CountDownLatch latch = latchRef.getAndSet(null);
-
- if (latch != null)
- latch.countDown();
-
- return new GridNioFinishedFuture<Object>(latch != null);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) {
- assert ses == HadoopIpcToNioAdapter.this.ses;
-
- boolean closed = HadoopIpcToNioAdapter.this.ses.setClosed();
-
- if (closed)
- endp.close();
-
- return new GridNioFinishedFuture<>(closed);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionIdleTimeout(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionWriteTimeout(ses);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMarshallerFilter.java
deleted file mode 100644
index 2d88490..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.marshaller.Marshaller;
-
-/**
- * Serialization filter.
- */
-public class HadoopMarshallerFilter extends GridNioFilterAdapter {
- /** Marshaller. */
- private Marshaller marshaller;
-
- /**
- * @param marshaller Marshaller to use.
- */
- public HadoopMarshallerFilter(Marshaller marshaller) {
- super("HadoopMarshallerFilter");
-
- this.marshaller = marshaller;
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionOpened(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionClosed(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
- proceedExceptionCaught(ses, ex);
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
-
- return proceedSessionWrite(ses, marshaller.marshal(msg));
- }
-
- @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
- assert msg instanceof byte[];
-
- // Always unmarshal with system classloader.
- proceedMessageReceived(ses, marshaller.unmarshal((byte[])msg, null));
- }
-
- /** {@inheritDoc} */
- @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws IgniteCheckedException {
- return proceedSessionClose(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionIdleTimeout(ses);
- }
-
- /** {@inheritDoc} */
- @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionWriteTimeout(ses);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMessageListener.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMessageListener.java
deleted file mode 100644
index 868b409..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopMessageListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-
-/**
- * Hadoop communication message listener.
- */
-public interface HadoopMessageListener {
- /**
- * @param desc Process descriptor.
- * @param msg Hadoop message.
- */
- public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg);
-
- /**
- * Called when connection to remote process was lost.
- *
- * @param desc Process descriptor.
- */
- public void onConnectionLost(HadoopProcessDescriptor desc);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
deleted file mode 100644
index 9428bb4..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Grid client for NIO server.
- */
-public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunicationClient {
- /** Socket. */
- private final GridNioSession ses;
-
- /**
- * Constructor for test purposes only.
- */
- public HadoopTcpNioCommunicationClient() {
- ses = null;
- }
-
- /**
- * @param ses Session.
- */
- public HadoopTcpNioCommunicationClient(GridNioSession ses) {
- assert ses != null;
-
- this.ses = ses;
- }
-
- /** {@inheritDoc} */
- @Override public boolean close() {
- boolean res = super.close();
-
- if (res)
- ses.close();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void forceClose() {
- super.forceClose();
-
- ses.close();
- }
-
- /** {@inheritDoc} */
- @Override public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg)
- throws IgniteCheckedException {
- if (closed())
- throw new IgniteCheckedException("Client was closed: " + this);
-
- GridNioFuture<?> fut = ses.send(msg);
-
- if (fut.isDone())
- fut.get();
- }
-
- /** {@inheritDoc} */
- @Override public long getIdleTime() {
- long now = U.currentTimeMillis();
-
- // Session can be used for receiving and sending.
- return Math.min(Math.min(now - ses.lastReceiveTime(), now - ses.lastSendScheduleTime()),
- now - ses.lastSendTime());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopTcpNioCommunicationClient.class, this, super.toString());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
new file mode 100644
index 0000000..090b336
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
+
+/**
+ * Hadoop job metadata. Internal object used for distributed job state tracking.
+ */
+public class HadoopJobMetadata implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private HadoopJobId jobId;
+
+ /** Job info. */
+ private HadoopJobInfo jobInfo;
+
+ /** Node submitted job. */
+ private UUID submitNodeId;
+
+ /** Map-reduce plan. */
+ private HadoopMapReducePlan mrPlan;
+
+ /** Pending splits for which mapper should be executed. */
+ private Map<HadoopInputSplit, Integer> pendingSplits;
+
+ /** Pending reducers. */
+ private Collection<Integer> pendingReducers;
+
+ /** Reducers addresses. */
+ @GridToStringInclude
+ private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
+
+ /** Job phase. */
+ private HadoopJobPhase phase = PHASE_SETUP;
+
+ /** Fail cause. */
+ @GridToStringExclude
+ private Throwable failCause;
+
+ /** Version. */
+ private long ver;
+
+ /** Job counters */
+ private HadoopCounters counters = new HadoopCountersImpl();
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public HadoopJobMetadata() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param submitNodeId Submit node ID.
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ */
+ public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ this.jobId = jobId;
+ this.jobInfo = jobInfo;
+ this.submitNodeId = submitNodeId;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param src Metadata to copy.
+ */
+ public HadoopJobMetadata(HadoopJobMetadata src) {
+ // Make sure to preserve alphabetic order.
+ counters = src.counters;
+ failCause = src.failCause;
+ jobId = src.jobId;
+ jobInfo = src.jobInfo;
+ mrPlan = src.mrPlan;
+ pendingSplits = src.pendingSplits;
+ pendingReducers = src.pendingReducers;
+ phase = src.phase;
+ reducersAddrs = src.reducersAddrs;
+ submitNodeId = src.submitNodeId;
+ ver = src.ver + 1;
+ }
+
+ /**
+ * @return Submit node ID.
+ */
+ public UUID submitNodeId() {
+ return submitNodeId;
+ }
+
+ /**
+ * @param phase Job phase.
+ */
+ public void phase(HadoopJobPhase phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * @return Job phase.
+ */
+ public HadoopJobPhase phase() {
+ return phase;
+ }
+
+ /**
+ * Gets reducers addresses for external execution.
+ *
+ * @return Reducers addresses.
+ */
+ public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
+ return reducersAddrs;
+ }
+
+ /**
+ * Sets reducers addresses for external execution.
+ *
+ * @param reducersAddrs Map of addresses.
+ */
+ public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
+ this.reducersAddrs = reducersAddrs;
+ }
+
+ /**
+ * Sets collection of pending splits.
+ *
+ * @param pendingSplits Collection of pending splits.
+ */
+ public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
+ this.pendingSplits = pendingSplits;
+ }
+
+ /**
+ * Gets collection of pending splits.
+ *
+ * @return Collection of pending splits.
+ */
+ public Map<HadoopInputSplit, Integer> pendingSplits() {
+ return pendingSplits;
+ }
+
+ /**
+ * Sets collection of pending reducers.
+ *
+ * @param pendingReducers Collection of pending reducers.
+ */
+ public void pendingReducers(Collection<Integer> pendingReducers) {
+ this.pendingReducers = pendingReducers;
+ }
+
+ /**
+ * Gets collection of pending reducers.
+ *
+ * @return Collection of pending reducers.
+ */
+ public Collection<Integer> pendingReducers() {
+ return pendingReducers;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @param mrPlan Map-reduce plan.
+ */
+ public void mapReducePlan(HadoopMapReducePlan mrPlan) {
+ assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
+
+ this.mrPlan = mrPlan;
+ }
+
+ /**
+ * @return Map-reduce plan.
+ */
+ public HadoopMapReducePlan mapReducePlan() {
+ return mrPlan;
+ }
+
+ /**
+ * @return Job info.
+ */
+ public HadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Returns job counters.
+ *
+ * @return Collection of counters.
+ */
+ public HadoopCounters counters() {
+ return counters;
+ }
+
+ /**
+ * Sets counters.
+ *
+ * @param counters Collection of counters.
+ */
+ public void counters(HadoopCounters counters) {
+ this.counters = counters;
+ }
+
+ /**
+ * @param failCause Fail cause.
+ */
+ public void failCause(Throwable failCause) {
+ assert failCause != null;
+
+ if (this.failCause == null) // Keep the first error.
+ this.failCause = failCause;
+ }
+
+ /**
+ * @return Fail cause.
+ */
+ public Throwable failCause() {
+ return failCause;
+ }
+
+ /**
+ * @return Version.
+ */
+ public long version() {
+ return ver;
+ }
+
+ /**
+ * @param split Split.
+ * @return Task number.
+ */
+ public int taskNumber(HadoopInputSplit split) {
+ return pendingSplits.get(split);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeUuid(out, submitNodeId);
+ out.writeObject(jobId);
+ out.writeObject(jobInfo);
+ out.writeObject(mrPlan);
+ out.writeObject(pendingSplits);
+ out.writeObject(pendingReducers);
+ out.writeObject(phase);
+ out.writeObject(failCause);
+ out.writeLong(ver);
+ out.writeObject(reducersAddrs);
+ out.writeObject(counters);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ submitNodeId = U.readUuid(in);
+ jobId = (HadoopJobId)in.readObject();
+ jobInfo = (HadoopJobInfo)in.readObject();
+ mrPlan = (HadoopMapReducePlan)in.readObject();
+ pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
+ pendingReducers = (Collection<Integer>)in.readObject();
+ phase = (HadoopJobPhase)in.readObject();
+ failCause = (Throwable)in.readObject();
+ ver = in.readLong();
+ reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
+ counters = (HadoopCounters)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
+ "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
+ failCause.getClass().getName());
+ }
+}
\ No newline at end of file