You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:42 UTC
[50/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java
index 0000000,8c07038..c09be9a
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java
@@@ -1,0 -1,763 +1,699 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.client.impl.connection;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.logger.java.*;
++import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.client.*;
+ import org.apache.ignite.client.impl.*;
+ import org.apache.ignite.client.util.*;
+ import org.apache.ignite.internal.processors.rest.client.message.*;
-import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.nio.ssl.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import javax.net.ssl.*;
+ import java.io.*;
+ import java.net.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.logging.*;
+
+ import static java.util.logging.Level.*;
+ import static org.apache.ignite.client.impl.connection.GridClientConnectionCloseReason.*;
+ import static org.apache.ignite.internal.GridNodeAttributes.*;
+
+ /**
+ * Cached connections manager.
+ */
+ abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager {
+ /** Count of reconnect retries before init considered failed. */
+ private static final int INIT_RETRY_CNT = 3;
+
+ /** Initialization retry interval. */
+ private static final int INIT_RETRY_INTERVAL = 1000;
+
+ /** Class logger. */
+ private final Logger log;
+
+ /** NIO server. */
+ private GridNioServer srv;
+
+ /** Active connections. */
+ private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>();
+
+ /** Active connections of nodes. */
+ private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>();
+
+ /** SSL context. */
+ private final SSLContext sslCtx;
+
+ /** Client configuration. */
+ protected final GridClientConfiguration cfg;
+
+ /** Topology. */
+ private final GridClientTopology top;
+
+ /** Client id. */
+ private final UUID clientId;
+
+ /** Router endpoints to use instead of topology info. */
+ private final Collection<InetSocketAddress> routers;
+
+ /** Closed flag. */
+ private volatile boolean closed;
+
+ /** Shared executor service. */
+ private final ExecutorService executor;
+
+ /** Endpoint striped lock. */
+ private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16);
+
+ /** Service for ping requests, {@code null} if HTTP protocol is used. */
+ private final ScheduledExecutorService pingExecutor;
+
+ /** Marshaller ID. */
+ private final Byte marshId;
+
- /** Message writer. */
- @SuppressWarnings("FieldCanBeLocal")
- private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() {
- @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageWriter(this, nodeId);
-
- return msg.writeTo(buf);
- }
-
- @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out,
- ByteBuffer buf) throws IOException {
- assert msg != null;
- assert out != null;
- assert buf != null;
- assert buf.hasArray();
-
- msg.messageWriter(this, nodeId);
-
- boolean finished = false;
- int cnt = 0;
-
- while (!finished) {
- finished = msg.writeTo(buf);
-
- out.write(buf.array(), 0, buf.position());
-
- cnt += buf.position();
-
- buf.clear();
- }
-
- return cnt;
- }
- };
-
+ /**
+ * @param clientId Client ID.
+ * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
+ * @param cfg Client configuration.
+ * @param routers Routers or empty collection to use endpoints from topology info.
+ * @param top Topology.
+ * @param marshId Marshaller ID.
+ * @throws GridClientException In case of error.
+ */
+ @SuppressWarnings("unchecked")
+ protected GridClientConnectionManagerAdapter(UUID clientId,
+ SSLContext sslCtx,
+ GridClientConfiguration cfg,
+ Collection<InetSocketAddress> routers,
+ GridClientTopology top,
+ @Nullable Byte marshId)
+ throws GridClientException {
+ assert clientId != null : "clientId != null";
+ assert cfg != null : "cfg != null";
+ assert routers != null : "routers != null";
+ assert top != null : "top != null";
+
+ this.clientId = clientId;
+ this.sslCtx = sslCtx;
+ this.cfg = cfg;
+ this.routers = new ArrayList<>(routers);
+ this.top = top;
+
+ log = Logger.getLogger(getClass().getName());
+
+ executor = cfg.getExecutorService() != null ? cfg.getExecutorService() :
+ Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true));
+
+ pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool(
+ Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null;
+
+ this.marshId = marshId;
+
+ if (marshId == null && cfg.getMarshaller() == null)
+ throw new GridClientException("Failed to start client (marshaller is not configured).");
+
+ if (cfg.getProtocol() == GridClientProtocol.TCP) {
+ try {
+ IgniteLogger gridLog = new IgniteJavaLogger(false);
+
+ GridNioFilter[] filters;
+
- GridNioMessageReader msgReader = new GridNioMessageReader() {
- @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg,
- ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageReader(this, nodeId);
-
- return msg.readFrom(buf);
- }
-
- @Nullable @Override public GridTcpMessageFactory messageFactory() {
- return null;
- }
- };
-
- GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true);
++ GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(), gridLog, true);
+
+ if (sslCtx != null) {
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog);
+
+ sslFilter.directMode(true);
+ sslFilter.clientMode(true);
+
+ filters = new GridNioFilter[]{codecFilter, sslFilter};
+ }
+ else
+ filters = new GridNioFilter[]{codecFilter};
+
+ srv = GridNioServer.builder().address(U.getLocalHost())
+ .port(-1)
+ .listener(new NioListener(log))
+ .filters(filters)
+ .logger(gridLog)
+ .selectorCount(Runtime.getRuntime().availableProcessors())
+ .sendQueueLimit(1024)
+ .byteOrder(ByteOrder.nativeOrder())
+ .tcpNoDelay(cfg.isTcpNoDelay())
+ .directBuffer(true)
+ .directMode(true)
+ .socketReceiveBufferSize(0)
+ .socketSendBufferSize(0)
+ .idleTimeout(Long.MAX_VALUE)
+ .gridName("gridClient")
- .messageWriter(msgWriter)
+ .daemon(cfg.isDaemon())
+ .build();
+
+ srv.start();
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new GridClientException("Failed to start connection server.", e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException {
+ init0();
+
+ GridClientException firstEx = null;
+
+ for (int i = 0; i < INIT_RETRY_CNT; i++) {
+ Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs);
+
+ while (!srvsCp.isEmpty()) {
+ GridClientConnection conn = null;
+
+ try {
+ conn = connect(null, srvsCp);
+
+ conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get();
+
+ return;
+ }
+ catch (GridServerUnreachableException e) {
+ // No connection could be opened to any of initial addresses - exit to retry loop.
+ assert conn == null :
+ "GridClientConnectionResetException was thrown from GridClientConnection#topology";
+
+ if (firstEx == null)
+ firstEx = e;
+
+ break;
+ }
+ catch (GridClientConnectionResetException e) {
+ // Connection was established but topology update failed -
+ // trying other initial addresses if any.
+ assert conn != null : "GridClientConnectionResetException was thrown from connect()";
+
+ if (firstEx == null)
+ firstEx = e;
+
+ if (!srvsCp.remove(conn.serverAddress()))
+ // We have misbehaving collection or equals - just exit to avoid infinite loop.
+ break;
+ }
+ }
+
+ Thread.sleep(INIT_RETRY_INTERVAL);
+ }
+
+ for (GridClientConnection c : conns.values()) {
+ conns.remove(c.serverAddress(), c);
+
+ c.close(FAILED, false);
+ }
+
+ throw firstEx;
+ }
+
+ /**
+ * Additional initialization.
+ *
+ * @throws GridClientException In case of error.
+ */
+ protected abstract void init0() throws GridClientException;
+
+ /**
+ * Gets active communication facade.
+ *
+ * @param node Remote node to which connection should be established.
+ * @throws GridServerUnreachableException If none of the servers can be reached after the exception.
+ * @throws GridClientClosedException If client was closed manually.
+ * @throws InterruptedException If connection was interrupted.
+ */
+ @Override public GridClientConnection connection(GridClientNode node)
+ throws GridClientClosedException, GridServerUnreachableException, InterruptedException {
+ assert node != null;
+
+ // Use router's connections if defined.
+ if (!routers.isEmpty())
+ return connection(null, routers);
+
+ GridClientConnection conn = nodeConns.get(node.nodeId());
+
+ if (conn != null) {
+ // Ignore closed connections.
+ if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
+ closeIdle();
+ else
+ return conn;
+ }
+
+ // Use node's connection, if node is available over rest.
+ Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true);
+
+ List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size());
+
+ for (InetSocketAddress endpoint : endpoints)
+ if (!endpoint.isUnresolved())
+ resolvedEndpoints.add(endpoint);
+
+ if (resolvedEndpoints.isEmpty()) {
+ throw new GridServerUnreachableException("No available endpoints to connect " +
+ "(is rest enabled for this node?): " + node);
+ }
+
+ boolean sameHost = node.attributes().isEmpty() ||
+ F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
+
+ Collection<InetSocketAddress> srvs = new LinkedHashSet<>();
+
+ if (sameHost) {
+ Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true));
+
+ srvs.addAll(resolvedEndpoints);
+ }
+ else {
+ for (InetSocketAddress endpoint : resolvedEndpoints)
+ if (!endpoint.getAddress().isLoopbackAddress())
+ srvs.add(endpoint);
+ }
+
+ return connection(node.nodeId(), srvs);
+ }
+
+ /**
+ * Returns connection to one of the given addresses.
+ *
+ * @param nodeId {@code UUID} of node for mapping with connection.
+ * {@code null} if no need of mapping.
+ * @param srvs Collection of addresses to connect to.
+ * @return Connection to use for operations, targeted for the given node.
+ * @throws GridServerUnreachableException If connection can't be established.
+ * @throws GridClientClosedException If connections manager has been closed already.
+ * @throws InterruptedException If connection was interrupted.
+ */
+ public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
+ throws GridServerUnreachableException, GridClientClosedException, InterruptedException {
+ if (srvs == null || srvs.isEmpty())
+ throw new GridServerUnreachableException("Failed to establish connection to the grid" +
+ " (address list is empty).");
+
+ checkClosed();
+
+ // Search for existent connection.
+ for (InetSocketAddress endPoint : srvs) {
+ assert endPoint != null;
+
+ GridClientConnection conn = conns.get(endPoint);
+
+ if (conn == null)
+ continue;
+
+ // Ignore closed connections.
+ if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
+ closeIdle();
+
+ continue;
+ }
+
+ if (nodeId != null)
+ nodeConns.put(nodeId, conn);
+
+ return conn;
+ }
+
+ return connect(nodeId, srvs);
+ }
+
+ /**
+ * Creates a connected facade and returns it. Called either from constructor or inside
+ * a write lock.
+ *
+ * @param nodeId {@code UUID} of node for mapping with connection.
+ * {@code null} if no need of mapping.
+ * @param srvs List of server addresses that this method will try to connect to.
+ * @return Established connection.
+ * @throws GridServerUnreachableException If none of the servers can be reached.
+ * @throws InterruptedException If connection was interrupted.
+ */
+ protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
+ throws GridServerUnreachableException, InterruptedException {
+ if (srvs.isEmpty())
+ throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " +
+ "list is empty).");
+
+ Exception cause = null;
+
+ for (InetSocketAddress srv : srvs) {
+ try {
+ return connect(nodeId, srv);
+ }
+ catch (InterruptedException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (cause == null)
+ cause = e;
+ else if (log.isLoggable(INFO))
+ log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']');
+ }
+ }
+
+ assert cause != null;
+
+ throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause);
+ }
+
+ /**
+ * Create new connection to specified server.
+ *
+ * @param nodeId {@code UUID} of node for mapping with connection.
+ * {@code null} if no need of mapping.
+ * @param addr Remote socket to connect.
+ * @return Established connection.
+ * @throws IOException If connection failed.
+ * @throws GridClientException If protocol error happened.
+ * @throws InterruptedException If thread was interrupted before connection was established.
+ */
+ protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr)
+ throws IOException, GridClientException, InterruptedException {
+ endpointStripedLock.lock(addr);
+
+ try {
+ GridClientConnection old = conns.get(addr);
+
+ if (old != null) {
+ if (old.isClosed()) {
+ conns.remove(addr, old);
+
+ if (nodeId != null)
+ nodeConns.remove(nodeId, old);
+ }
+ else {
+ if (nodeId != null)
+ nodeConns.put(nodeId, old);
+
+ return old;
+ }
+ }
+
+ GridSecurityCredentials cred = null;
+
+ try {
+ if (cfg.getSecurityCredentialsProvider() != null)
+ cred = cfg.getSecurityCredentialsProvider().credentials();
+ }
+ catch (IgniteCheckedException e) {
+ throw new GridClientException("Failed to obtain client credentials.", e);
+ }
+
+ GridClientConnection conn;
+
+ if (cfg.getProtocol() == GridClientProtocol.TCP) {
+ conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+ cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+ cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal());
+ }
+ else
+ throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " +
+ cfg.getProtocol());
+
+ old = conns.putIfAbsent(addr, conn);
+
+ assert old == null;
+
+ if (nodeId != null)
+ nodeConns.put(nodeId, conn);
+
+ return conn;
+ }
+ finally {
+ endpointStripedLock.unlock(addr);
+ }
+ }
+
+ /**
+ * @return Get thread local used to enable keep portables mode.
+ */
+ protected ThreadLocal<Boolean> keepPortablesThreadLocal() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) {
+ if (log.isLoggable(Level.FINE))
+ log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" +
+ conn.serverAddress() + ", errMsg=" + e.getMessage() + ']');
+
+ closeIdle();
+
+ conn.close(FAILED, false);
+ }
+
+ /**
+ * Closes all opened connections.
+ *
+ * @param waitCompletion If {@code true} waits for all pending requests to be proceeded.
+ */
+ @SuppressWarnings("TooBroadScope")
+ @Override public void stop(boolean waitCompletion) {
+ Collection<GridClientConnection> closeConns;
+
+ if (closed)
+ return;
+
+ // Mark manager as closed.
+ closed = true;
+
+ // Remove all connections from cache.
+ closeConns = new ArrayList<>(conns.values());
+
+ conns.clear();
+
+ nodeConns.clear();
+
+ // Close old connection outside the writer lock.
+ for (GridClientConnection conn : closeConns)
+ conn.close(CLIENT_CLOSED, waitCompletion);
+
+ if (pingExecutor != null)
+ GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log);
+
+ GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log);
+
+ if (srv != null)
+ srv.stop();
+ }
+
+ /**
+ * Close all connections idling for more then
+ * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private void closeIdle() {
+ for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<UUID, GridClientConnection> entry = it.next();
+
+ GridClientConnection conn = entry.getValue();
+
+ if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
+ conns.remove(conn.serverAddress(), conn);
+
+ nodeConns.remove(entry.getKey(), conn);
+ }
+ }
+
+ for (GridClientConnection conn : conns.values())
+ if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
+ conns.remove(conn.serverAddress(), conn);
+ }
+
+ /**
+ * Checks and throws an exception if this client was closed.
+ *
+ * @throws GridClientClosedException If client was closed.
+ */
+ private void checkClosed() throws GridClientClosedException {
+ if (closed)
+ throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore).");
+ }
+
+ /**
+ */
+ private static class NioListener implements GridNioServerListener {
+ /** */
+ private final Logger log;
+
+ /**
+ * @param log Logger.
+ */
+ private NioListener(Logger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnected(GridNioSession ses) {
+ if (log.isLoggable(Level.FINE))
+ log.fine("Session connected: " + ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (log.isLoggable(Level.FINE))
+ log.fine("Session disconnected: " + ses);
+
+ GridClientFutureAdapter<Boolean> handshakeFut =
+ ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+ if (handshakeFut != null)
+ handshakeFut.onDone(
+ new GridClientConnectionResetException("Failed to perform handshake (connection failed)."));
+ else {
+ GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
+
+ if (conn != null)
+ conn.close(FAILED, false);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(GridNioSession ses, Object msg) {
+ GridClientFutureAdapter<Boolean> handshakeFut =
+ ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+ if (handshakeFut != null) {
+ assert msg instanceof GridClientHandshakeResponse;
+
+ handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg);
+ }
+ else {
+ GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
+
+ assert conn != null;
+
+ if (msg instanceof GridClientMessageWrapper) {
+ GridClientMessageWrapper req = (GridClientMessageWrapper)msg;
+
+ if (req.messageSize() != 0) {
+ assert req.message() != null;
+
+ conn.handleResponse(req);
+ }
+ else
+ conn.handlePingResponse();
+ }
+ else {
+ assert msg instanceof GridClientPingPacket : msg;
+
+ conn.handlePingResponse();
+ }
+ }
+ }
+
+ /**
+ * Handles client handshake response.
+ *
+ * @param handshakeFut Future.
+ * @param msg A handshake response.
+ */
+ private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut,
+ GridClientHandshakeResponse msg) {
+ byte rc = msg.resultCode();
+
+ if (rc != GridClientHandshakeResponse.OK.resultCode()) {
+ handshakeFut.onDone(new GridClientHandshakeException(rc,
+ "Handshake failed due to internal error (see server log for more details)."));
+ }
+ else
+ handshakeFut.onDone(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionWriteTimeout(GridNioSession ses) {
+ if (log.isLoggable(Level.FINE))
+ log.fine("Closing NIO session because of write timeout.");
+
+ ses.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionIdleTimeout(GridNioSession ses) {
+ if (log.isLoggable(Level.FINE))
+ log.fine("Closing NIO session because of idle timeout.");
+
+ ses.close();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NioParser implements GridNioParser {
+ /** Message metadata key. */
+ private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
- /** Message reader. */
- private final GridNioMessageReader msgReader;
-
- /**
- * @param msgReader Message reader.
- */
- NioParser(GridNioMessageReader msgReader) {
- this.msgReader = msgReader;
- }
-
+ /** {@inheritDoc} */
- @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
++ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
++ throws IOException, IgniteCheckedException {
+ GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+ if (handshakeFut != null) {
+ byte code = buf.get();
+
+ return new GridClientHandshakeResponse(code);
+ }
+
+ GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY);
+
+ if (msg == null && buf.hasRemaining()) {
+ byte type = buf.get();
+
+ if (type == GridClientMessageWrapper.REQ_HEADER)
+ msg = new GridClientMessageWrapper();
+ else
+ throw new IOException("Invalid message type: " + type);
+ }
+
+ boolean finished = false;
+
+ if (buf.hasRemaining())
- finished = msgReader.read(null, msg, buf);
++ finished = msg.readFrom(buf);
+
+ if (finished)
+ return msg;
+ else {
+ ses.addMeta(MSG_META_KEY, msg);
+
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ // No encoding needed for direct messages.
+ throw new UnsupportedEncodingException();
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java
index 0000000,ba2cc86..924f349
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java
@@@ -1,0 -1,215 +1,211 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal;
+
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Job cancellation request.
+ */
+ public class GridJobCancelRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteUuid sesId;
+
+ /** */
+ private IgniteUuid jobId;
+
+ /** */
+ private boolean sys;
+
+ /**
+ * No-op constructor to support {@link Externalizable} interface.
+ * This constructor is not meant to be used for other purposes.
+ */
+ public GridJobCancelRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param sesId Task session ID.
+ */
+ public GridJobCancelRequest(IgniteUuid sesId) {
+ assert sesId != null;
+
+ this.sesId = sesId;
+ }
+
+ /**
+ * @param sesId Task session ID.
+ * @param jobId Job ID.
+ */
+ public GridJobCancelRequest(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId) {
+ assert sesId != null || jobId != null;
+
+ this.sesId = sesId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * @param sesId Session ID.
+ * @param jobId Job ID.
+ * @param sys System flag.
+ */
+ public GridJobCancelRequest(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId, boolean sys) {
+ assert sesId != null || jobId != null;
+
+ this.sesId = sesId;
+ this.jobId = jobId;
+ this.sys = sys;
+ }
+
+ /**
+ * Gets execution ID of task to be cancelled.
+ *
+ * @return Execution ID of task to be cancelled.
+ */
+ @Nullable public IgniteUuid sessionId() {
+ return sesId;
+ }
+
+ /**
+ * Gets session ID of job to be cancelled. If {@code null}, then
+ * all jobs for the specified task execution ID will be cancelled.
+ *
+ * @return Execution ID of job to be cancelled.
+ */
+ @Nullable public IgniteUuid jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return {@code True} if request to cancel is sent out of system when task
+ * has already been reduced and further results are no longer interesting.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridJobCancelRequest _clone = new GridJobCancelRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridJobCancelRequest _clone = (GridJobCancelRequest)_msg;
+
+ _clone.sesId = sesId;
+ _clone.jobId = jobId;
+ _clone.sys = sys;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putGridUuid(jobId))
++ if (!commState.putGridUuid("jobId", jobId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putGridUuid(sesId))
++ if (!commState.putGridUuid("sesId", sesId))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putBoolean(sys))
++ if (!commState.putBoolean("sys", sys))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- IgniteUuid jobId0 = commState.getGridUuid();
++ jobId = commState.getGridUuid("jobId");
+
- if (jobId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobId = jobId0;
-
+ commState.idx++;
+
+ case 1:
- IgniteUuid sesId0 = commState.getGridUuid();
++ sesId = commState.getGridUuid("sesId");
+
- if (sesId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sesId = sesId0;
-
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 1)
- return false;
++ sys = commState.getBoolean("sys");
+
- sys = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridJobCancelRequest.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 0000000,ae897e6..929f718
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@@ -1,0 -1,941 +1,919 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal;
+
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Job execution request.
+ */
+ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter implements GridTaskMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** */
+ private IgniteUuid sesId;
+
+ /** */
+ private IgniteUuid jobId;
+
+ /** */
+ @GridToStringExclude
+ private byte[] jobBytes;
+
+ /** */
+ @GridToStringExclude
+ @GridDirectTransient
+ private ComputeJob job;
+
+ /** */
+ private long startTaskTime;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private String taskName;
+
+ /** */
+ private String userVer;
+
+ /** */
+ private String taskClsName;
+
+ /** Node class loader participants. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ private Map<UUID, IgniteUuid> ldrParticipants;
+
+ /** */
+ @GridToStringExclude
+ private byte[] sesAttrsBytes;
+
+ /** */
+ @GridToStringExclude
+ @GridDirectTransient
+ private Map<Object, Object> sesAttrs;
+
+ /** */
+ @GridToStringExclude
+ private byte[] jobAttrsBytes;
+
+ /** */
+ @GridToStringExclude
+ @GridDirectTransient
+ private Map<? extends Serializable, ? extends Serializable> jobAttrs;
+
+ /** Checkpoint SPI name. */
+ private String cpSpi;
+
+ /** */
+ @GridDirectTransient
+ private Collection<ComputeJobSibling> siblings;
+
+ /** */
+ private byte[] siblingsBytes;
+
+ /** Transient since needs to hold local creation time. */
+ @GridDirectTransient
+ private long createTime0 = U.currentTimeMillis();
+
+ /** @deprecated need to remove and use only {@link #createTime0}. */
+ @Deprecated
+ private long createTime = createTime0;
+
+ /** */
+ private IgniteUuid clsLdrId;
+
+ /** */
+ private IgniteDeploymentMode depMode;
+
+ /** */
+ private boolean dynamicSiblings;
+
+ /** */
+ private boolean forceLocDep;
+
+ /** */
+ private boolean sesFullSup;
+
+ /** */
+ private boolean internal;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private Collection<UUID> top;
+
+ /**
+ * No-op constructor to support {@link Externalizable} interface.
+ */
+ public GridJobExecuteRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param sesId Task session ID.
+ * @param jobId Job ID.
+ * @param taskName Task name.
+ * @param userVer Code version.
+ * @param taskClsName Fully qualified task name.
+ * @param jobBytes Job serialized body.
+ * @param job Job.
+ * @param startTaskTime Task execution start time.
+ * @param timeout Task execution timeout.
+ * @param top Topology.
+ * @param siblingsBytes Serialized collection of split siblings.
+ * @param siblings Collection of split siblings.
+ * @param sesAttrsBytes Map of session attributes.
+ * @param sesAttrs Session attributes.
+ * @param jobAttrsBytes Job context attributes.
+ * @param jobAttrs Job attributes.
+ * @param cpSpi Collision SPI.
+ * @param clsLdrId Task local class loader id.
+ * @param depMode Task deployment mode.
+ * @param dynamicSiblings {@code True} if siblings are dynamic.
+ * @param ldrParticipants Other node class loader IDs that can also load classes.
+ * @param forceLocDep {@code True} If remote node should ignore deployment settings.
+ * @param sesFullSup {@code True} if session attributes are disabled.
+ * @param internal {@code True} if internal job.
+ * @param subjId Subject ID.
+ */
+ public GridJobExecuteRequest(
+ IgniteUuid sesId,
+ IgniteUuid jobId,
+ String taskName,
+ String userVer,
+ String taskClsName,
+ byte[] jobBytes,
+ ComputeJob job,
+ long startTaskTime,
+ long timeout,
+ @Nullable Collection<UUID> top,
+ byte[] siblingsBytes,
+ Collection<ComputeJobSibling> siblings,
+ byte[] sesAttrsBytes,
+ Map<Object, Object> sesAttrs,
+ byte[] jobAttrsBytes,
+ Map<? extends Serializable, ? extends Serializable> jobAttrs,
+ String cpSpi,
+ IgniteUuid clsLdrId,
+ IgniteDeploymentMode depMode,
+ boolean dynamicSiblings,
+ Map<UUID, IgniteUuid> ldrParticipants,
+ boolean forceLocDep,
+ boolean sesFullSup,
+ boolean internal,
+ UUID subjId) {
+ this.top = top;
+ assert sesId != null;
+ assert jobId != null;
+ assert taskName != null;
+ assert taskClsName != null;
+ assert job != null || jobBytes != null;
+ assert sesAttrs != null || sesAttrsBytes != null || !sesFullSup;
+ assert jobAttrs != null || jobAttrsBytes != null;
+ assert clsLdrId != null;
+ assert userVer != null;
+ assert depMode != null;
+
+ this.sesId = sesId;
+ this.jobId = jobId;
+ this.taskName = taskName;
+ this.userVer = userVer;
+ this.taskClsName = taskClsName;
+ this.jobBytes = jobBytes;
+ this.job = job;
+ this.startTaskTime = startTaskTime;
+ this.timeout = timeout;
+ this.top = top;
+ this.siblingsBytes = siblingsBytes;
+ this.siblings = siblings;
+ this.sesAttrsBytes = sesAttrsBytes;
+ this.sesAttrs = sesAttrs;
+ this.jobAttrsBytes = jobAttrsBytes;
+ this.jobAttrs = jobAttrs;
+ this.clsLdrId = clsLdrId;
+ this.depMode = depMode;
+ this.dynamicSiblings = dynamicSiblings;
+ this.ldrParticipants = ldrParticipants;
+ this.forceLocDep = forceLocDep;
+ this.sesFullSup = sesFullSup;
+ this.internal = internal;
+ this.subjId = subjId;
+
+ this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid getSessionId() {
+ return sesId;
+ }
+
+ /**
+ * @return Job session ID.
+ */
+ public IgniteUuid getJobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Task version.
+ */
+ public String getTaskClassName() {
+ return taskClsName;
+ }
+
+ /**
+ * @return Task name.
+ */
+ public String getTaskName() {
+ return taskName;
+ }
+
+ /**
+ * @return Task version.
+ */
+ public String getUserVersion() {
+ return userVer;
+ }
+
+ /**
+ * @return Serialized job bytes.
+ */
+ public byte[] getJobBytes() {
+ return jobBytes;
+ }
+
+ /**
+ * @return Grid job.
+ */
+ public ComputeJob getJob() {
+ return job;
+ }
+
+ /**
+ * @return Task start time.
+ */
+ public long getStartTaskTime() {
+ return startTaskTime;
+ }
+
+ /**
+ * @return Timeout.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Gets this instance creation time.
+ *
+ * @return This instance creation time.
+ */
+ public long getCreateTime() {
+ return createTime0;
+ }
+
+ /**
+ * @return Serialized collection of split siblings.
+ */
+ public byte[] getSiblingsBytes() {
+ return siblingsBytes;
+ }
+
+ /**
+ * @return Job siblings.
+ */
+ public Collection<ComputeJobSibling> getSiblings() {
+ return siblings;
+ }
+
+ /**
+ * @return Session attributes.
+ */
+ public byte[] getSessionAttributesBytes() {
+ return sesAttrsBytes;
+ }
+
+ /**
+ * @return Session attributes.
+ */
+ public Map<Object, Object> getSessionAttributes() {
+ return sesAttrs;
+ }
+
+ /**
+ * @return Job attributes.
+ */
+ public byte[] getJobAttributesBytes() {
+ return jobAttrsBytes;
+ }
+
+ /**
+ * @return Job attributes.
+ */
+ public Map<? extends Serializable, ? extends Serializable> getJobAttributes() {
+ return jobAttrs;
+ }
+
+ /**
+ * @return Checkpoint SPI name.
+ */
+ public String getCheckpointSpi() {
+ return cpSpi;
+ }
+
+ /**
+ * @return Task local class loader id.
+ */
+ public IgniteUuid getClassLoaderId() {
+ return clsLdrId;
+ }
+
+ /**
+ * @return Deployment mode.
+ */
+ public IgniteDeploymentMode getDeploymentMode() {
+ return depMode;
+ }
+
+ /**
+ * Returns true if siblings list is dynamic, i.e. task is continuous.
+ *
+ * @return True if siblings list is dynamic.
+ */
+ public boolean isDynamicSiblings() {
+ return dynamicSiblings;
+ }
+
+ /**
+ * @return Node class loader participant map.
+ */
+ public Map<UUID, IgniteUuid> getLoaderParticipants() {
+ return ldrParticipants;
+ }
+
+ /**
+ * @return Returns {@code true} if deployment should always be used.
+ */
+ public boolean isForceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /**
+ * @return Topology.
+ */
+ @Nullable public Collection<UUID> topology() {
+ return top;
+ }
+ /**
+ * @return {@code True} if session attributes are enabled.
+ */
+ public boolean isSessionFullSupport() {
+ return sesFullSup;
+ }
+
+ /**
+ * @return {@code True} if internal job.
+ */
+ public boolean isInternal() {
+ return internal;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ public UUID getSubjectId() {
+ return subjId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridJobExecuteRequest _clone = new GridJobExecuteRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridJobExecuteRequest _clone = (GridJobExecuteRequest)_msg;
+
+ _clone.subjId = subjId;
+ _clone.sesId = sesId;
+ _clone.jobId = jobId;
+ _clone.jobBytes = jobBytes;
+ _clone.job = job;
+ _clone.startTaskTime = startTaskTime;
+ _clone.timeout = timeout;
+ _clone.taskName = taskName;
+ _clone.userVer = userVer;
+ _clone.taskClsName = taskClsName;
+ _clone.ldrParticipants = ldrParticipants;
+ _clone.sesAttrsBytes = sesAttrsBytes;
+ _clone.sesAttrs = sesAttrs;
+ _clone.jobAttrsBytes = jobAttrsBytes;
+ _clone.jobAttrs = jobAttrs;
+ _clone.cpSpi = cpSpi;
+ _clone.siblings = siblings;
+ _clone.siblingsBytes = siblingsBytes;
+ _clone.createTime0 = createTime0;
+ _clone.createTime = createTime;
+ _clone.clsLdrId = clsLdrId;
+ _clone.depMode = depMode;
+ _clone.dynamicSiblings = dynamicSiblings;
+ _clone.forceLocDep = forceLocDep;
+ _clone.sesFullSup = sesFullSup;
+ _clone.internal = internal;
+ _clone.top = top;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putGridUuid(clsLdrId))
++ if (!commState.putGridUuid("clsLdrId", clsLdrId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putString(cpSpi))
++ if (!commState.putString("cpSpi", cpSpi))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putLong(createTime))
++ if (!commState.putLong("createTime", createTime))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putEnum(depMode))
++ if (!commState.putEnum("depMode", depMode))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putBoolean(dynamicSiblings))
++ if (!commState.putBoolean("dynamicSiblings", dynamicSiblings))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putBoolean(forceLocDep))
++ if (!commState.putBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putBoolean(internal))
++ if (!commState.putBoolean("internal", internal))
+ return false;
+
+ commState.idx++;
+
+ case 7:
- if (!commState.putByteArray(jobAttrsBytes))
++ if (!commState.putByteArray("jobAttrsBytes", jobAttrsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 8:
- if (!commState.putByteArray(jobBytes))
++ if (!commState.putByteArray("jobBytes", jobBytes))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(jobId))
++ if (!commState.putGridUuid("jobId", jobId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
+ if (ldrParticipants != null) {
+ if (commState.it == null) {
- if (!commState.putInt(ldrParticipants.size()))
++ if (!commState.putInt(null, ldrParticipants.size()))
+ return false;
+
+ commState.it = ldrParticipants.entrySet().iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur;
+
+ if (!commState.keyDone) {
- if (!commState.putUuid(e.getKey()))
++ if (!commState.putUuid(null, e.getKey()))
+ return false;
+
+ commState.keyDone = true;
+ }
+
- if (!commState.putGridUuid(e.getValue()))
++ if (!commState.putGridUuid(null, e.getValue()))
+ return false;
+
+ commState.keyDone = false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putByteArray(sesAttrsBytes))
++ if (!commState.putByteArray("sesAttrsBytes", sesAttrsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putBoolean(sesFullSup))
++ if (!commState.putBoolean("sesFullSup", sesFullSup))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putGridUuid(sesId))
++ if (!commState.putGridUuid("sesId", sesId))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putByteArray(siblingsBytes))
++ if (!commState.putByteArray("siblingsBytes", siblingsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 15:
- if (!commState.putLong(startTaskTime))
++ if (!commState.putLong("startTaskTime", startTaskTime))
+ return false;
+
+ commState.idx++;
+
+ case 16:
- if (!commState.putUuid(subjId))
++ if (!commState.putUuid("subjId", subjId))
+ return false;
+
+ commState.idx++;
+
+ case 17:
- if (!commState.putString(taskClsName))
++ if (!commState.putString("taskClsName", taskClsName))
+ return false;
+
+ commState.idx++;
+
+ case 18:
- if (!commState.putString(taskName))
++ if (!commState.putString("taskName", taskName))
+ return false;
+
+ commState.idx++;
+
+ case 19:
- if (!commState.putLong(timeout))
++ if (!commState.putLong("timeout", timeout))
+ return false;
+
+ commState.idx++;
+
+ case 20:
+ if (top != null) {
+ if (commState.it == null) {
- if (!commState.putInt(top.size()))
++ if (!commState.putInt(null, top.size()))
+ return false;
+
+ commState.it = top.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putUuid((UUID)commState.cur))
++ if (!commState.putUuid(null, (UUID)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 21:
- if (!commState.putString(userVer))
++ if (!commState.putString("userVer", userVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- IgniteUuid clsLdrId0 = commState.getGridUuid();
++ clsLdrId = commState.getGridUuid("clsLdrId");
+
- if (clsLdrId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- clsLdrId = clsLdrId0;
-
+ commState.idx++;
+
+ case 1:
- String cpSpi0 = commState.getString();
++ cpSpi = commState.getString("cpSpi");
+
- if (cpSpi0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- cpSpi = cpSpi0;
-
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 8)
- return false;
++ createTime = commState.getLong("createTime");
+
- createTime = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 3:
- if (buf.remaining() < 1)
- return false;
++ byte depMode0 = commState.getByte("depMode");
+
- byte depMode0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ depMode = IgniteDeploymentMode.fromOrdinal(depMode0);
+
+ commState.idx++;
+
+ case 4:
- if (buf.remaining() < 1)
- return false;
++ dynamicSiblings = commState.getBoolean("dynamicSiblings");
+
- dynamicSiblings = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 5:
- if (buf.remaining() < 1)
- return false;
++ forceLocDep = commState.getBoolean("forceLocDep");
+
- forceLocDep = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 6:
- if (buf.remaining() < 1)
- return false;
++ internal = commState.getBoolean("internal");
+
- internal = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 7:
- byte[] jobAttrsBytes0 = commState.getByteArray();
++ jobAttrsBytes = commState.getByteArray("jobAttrsBytes");
+
- if (jobAttrsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobAttrsBytes = jobAttrsBytes0;
-
+ commState.idx++;
+
+ case 8:
- byte[] jobBytes0 = commState.getByteArray();
++ jobBytes = commState.getByteArray("jobBytes");
+
- if (jobBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobBytes = jobBytes0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid jobId0 = commState.getGridUuid();
++ jobId = commState.getGridUuid("jobId");
+
- if (jobId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobId = jobId0;
-
+ commState.idx++;
+
+ case 10:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (ldrParticipants == null)
+ ldrParticipants = new HashMap<>(commState.readSize, 1.0f);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ if (!commState.keyDone) {
- UUID _val = commState.getUuid();
++ UUID _val = commState.getUuid(null);
+
- if (_val == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ commState.cur = _val;
+ commState.keyDone = true;
+ }
+
- IgniteUuid _val = commState.getGridUuid();
++ IgniteUuid _val = commState.getGridUuid(null);
+
- if (_val == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ ldrParticipants.put((UUID)commState.cur, _val);
+
+ commState.keyDone = false;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+ commState.cur = null;
+
+ commState.idx++;
+
+ case 11:
- byte[] sesAttrsBytes0 = commState.getByteArray();
++ sesAttrsBytes = commState.getByteArray("sesAttrsBytes");
+
- if (sesAttrsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sesAttrsBytes = sesAttrsBytes0;
-
+ commState.idx++;
+
+ case 12:
- if (buf.remaining() < 1)
- return false;
++ sesFullSup = commState.getBoolean("sesFullSup");
+
- sesFullSup = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 13:
- IgniteUuid sesId0 = commState.getGridUuid();
++ sesId = commState.getGridUuid("sesId");
+
- if (sesId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sesId = sesId0;
-
+ commState.idx++;
+
+ case 14:
- byte[] siblingsBytes0 = commState.getByteArray();
++ siblingsBytes = commState.getByteArray("siblingsBytes");
+
- if (siblingsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- siblingsBytes = siblingsBytes0;
-
+ commState.idx++;
+
+ case 15:
- if (buf.remaining() < 8)
- return false;
++ startTaskTime = commState.getLong("startTaskTime");
+
- startTaskTime = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 16:
+ UUID subjId0 = commState.getUuid();
+
+ if (subjId0 == UUID_NOT_READ)
+ return false;
+
+ subjId = subjId0;
+
+ commState.idx++;
+
+ case 17:
- String taskClsName0 = commState.getString();
++ String taskClsName0 = commState.getString("taskClsName");
+
- if (taskClsName0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- taskClsName = taskClsName0;
-
+ commState.idx++;
+
+ case 18:
- String taskName0 = commState.getString();
++ String taskName0 = commState.getString("taskName");
+
- if (taskName0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- taskName = taskName0;
-
+ commState.idx++;
+
+ case 19:
- if (buf.remaining() < 8)
- return false;
++ timeout = commState.getLong("timeout");
+
- timeout = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 20:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (top == null)
+ top = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- UUID _val = commState.getUuid();
++ UUID _val = commState.getUuid(null);
+
- if (_val == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ top.add((UUID)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 21:
- String userVer0 = commState.getString();
++ String userVer0 = commState.getString("userVer");
+
- if (userVer0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- userVer = userVer0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 81;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridJobExecuteRequest.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index 0000000,d3e6e56..d2711a7
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@@ -1,0 -1,374 +1,362 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Job execution response.
+ */
+ public class GridJobExecuteResponse extends GridTcpCommunicationMessageAdapter implements GridTaskMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private IgniteUuid sesId;
+
+ /** */
+ private IgniteUuid jobId;
+
+ /** */
+ private byte[] gridExBytes;
+
+ /** */
+ @GridDirectTransient
+ private IgniteCheckedException gridEx;
+
+ /** */
+ private byte[] resBytes;
+
+ /** */
+ @GridDirectTransient
+ private Object res;
+
+ /** */
+ private byte[] jobAttrsBytes;
+
+ /** */
+ @GridDirectTransient
+ private Map<Object, Object> jobAttrs;
+
+ /** */
+ private boolean isCancelled;
+
+ /** */
+ @GridToStringExclude
+ @GridDirectTransient
+ private IgniteCheckedException fakeEx;
+
+ /**
+ * No-op constructor to support {@link Externalizable} interface. This
+ * constructor is not meant to be used for other purposes.
+ */
+ public GridJobExecuteResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param sesId Task session ID
+ * @param jobId Job ID.
+ * @param gridExBytes Serialized grid exception.
+ * @param gridEx Grid exception.
+ * @param resBytes Serialized result.
+ * @param res Result.
+ * @param jobAttrsBytes Serialized job attributes.
+ * @param jobAttrs Job attributes.
+ * @param isCancelled Whether job was cancelled or not.
+ */
+ public GridJobExecuteResponse(UUID nodeId, IgniteUuid sesId, IgniteUuid jobId, byte[] gridExBytes,
+ IgniteCheckedException gridEx, byte[] resBytes, Object res, byte[] jobAttrsBytes,
+ Map<Object, Object> jobAttrs, boolean isCancelled) {
+ assert nodeId != null;
+ assert sesId != null;
+ assert jobId != null;
+
+ this.nodeId = nodeId;
+ this.sesId = sesId;
+ this.jobId = jobId;
+ this.gridExBytes = gridExBytes;
+ this.gridEx = gridEx;
+ this.resBytes = resBytes;
+ this.res = res;
+ this.jobAttrsBytes = jobAttrsBytes;
+ this.jobAttrs = jobAttrs;
+ this.isCancelled = isCancelled;
+ }
+
+ /**
+ * @return Task session ID.
+ */
+ @Override public IgniteUuid getSessionId() {
+ return sesId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public IgniteUuid getJobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Serialized job result.
+ */
+ @Nullable public byte[] getJobResultBytes() {
+ return resBytes;
+ }
+
+ /**
+ * @return Job result.
+ */
+ @Nullable public Object getJobResult() {
+ return res;
+ }
+
+ /**
+ * @return Serialized job exception.
+ */
+ @Nullable public byte[] getExceptionBytes() {
+ return gridExBytes;
+ }
+
+ /**
+ * @return Job exception.
+ */
+ @Nullable public IgniteCheckedException getException() {
+ return gridEx;
+ }
+
+ /**
+ * @return Serialized job attributes.
+ */
+ @Nullable public byte[] getJobAttributesBytes() {
+ return jobAttrsBytes;
+ }
+
+ /**
+ * @return Job attributes.
+ */
+ @Nullable public Map<Object, Object> getJobAttributes() {
+ return jobAttrs;
+ }
+
+ /**
+ * @return Job cancellation status.
+ */
+ public boolean isCancelled() {
+ return isCancelled;
+ }
+
+ /**
+ * @return Sender node ID.
+ */
+ public UUID getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Fake exception.
+ */
+ public IgniteCheckedException getFakeException() {
+ return fakeEx;
+ }
+
+ /**
+ * @param fakeEx Fake exception.
+ */
+ public void setFakeException(IgniteCheckedException fakeEx) {
+ this.fakeEx = fakeEx;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridJobExecuteResponse _clone = new GridJobExecuteResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridJobExecuteResponse _clone = (GridJobExecuteResponse)_msg;
+
+ _clone.nodeId = nodeId;
+ _clone.sesId = sesId;
+ _clone.jobId = jobId;
+ _clone.gridExBytes = gridExBytes;
+ _clone.gridEx = gridEx;
+ _clone.resBytes = resBytes;
+ _clone.res = res;
+ _clone.jobAttrsBytes = jobAttrsBytes;
+ _clone.jobAttrs = jobAttrs;
+ _clone.isCancelled = isCancelled;
+ _clone.fakeEx = fakeEx;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArray(gridExBytes))
++ if (!commState.putByteArray("gridExBytes", gridExBytes))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putBoolean(isCancelled))
++ if (!commState.putBoolean("isCancelled", isCancelled))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putByteArray(jobAttrsBytes))
++ if (!commState.putByteArray("jobAttrsBytes", jobAttrsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putGridUuid(jobId))
++ if (!commState.putGridUuid("jobId", jobId))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putUuid(nodeId))
++ if (!commState.putUuid("nodeId", nodeId))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putByteArray(resBytes))
++ if (!commState.putByteArray("resBytes", resBytes))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putGridUuid(sesId))
++ if (!commState.putGridUuid("sesId", sesId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- byte[] gridExBytes0 = commState.getByteArray();
++ gridExBytes = commState.getByteArray("gridExBytes");
+
- if (gridExBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- gridExBytes = gridExBytes0;
-
+ commState.idx++;
+
+ case 1:
- if (buf.remaining() < 1)
- return false;
++ isCancelled = commState.getBoolean("isCancelled");
+
- isCancelled = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 2:
- byte[] jobAttrsBytes0 = commState.getByteArray();
++ jobAttrsBytes = commState.getByteArray("jobAttrsBytes");
+
- if (jobAttrsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobAttrsBytes = jobAttrsBytes0;
-
+ commState.idx++;
+
+ case 3:
- IgniteUuid jobId0 = commState.getGridUuid();
++ jobId = commState.getGridUuid("jobId");
+
- if (jobId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- jobId = jobId0;
-
+ commState.idx++;
+
+ case 4:
- UUID nodeId0 = commState.getUuid();
++ nodeId = commState.getUuid("nodeId");
+
- if (nodeId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nodeId = nodeId0;
-
+ commState.idx++;
+
+ case 5:
- byte[] resBytes0 = commState.getByteArray();
++ resBytes = commState.getByteArray("resBytes");
+
- if (resBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- resBytes = resBytes0;
-
+ commState.idx++;
+
+ case 6:
- IgniteUuid sesId0 = commState.getGridUuid();
++ sesId = commState.getGridUuid("sesId");
+
- if (sesId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sesId = sesId0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridJobExecuteResponse.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
index 0000000,6c2d756..abe558f
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java
@@@ -1,0 -1,175 +1,171 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal;
+
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Job siblings request.
+ */
+ public class GridJobSiblingsRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteUuid sesId;
+
+ /** */
+ @GridDirectTransient
+ private Object topic;
+
+ /** */
+ private byte[] topicBytes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridJobSiblingsRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param sesId Session ID.
+ * @param topic Topic.
+ * @param topicBytes Serialized topic.
+ */
+ public GridJobSiblingsRequest(IgniteUuid sesId, Object topic, byte[] topicBytes) {
+ assert sesId != null;
+ assert topic != null || topicBytes != null;
+
+ this.sesId = sesId;
+ this.topic = topic;
+ this.topicBytes = topicBytes;
+ }
+
+ /**
+ * @return Session ID.
+ */
+ public IgniteUuid sessionId() {
+ return sesId;
+ }
+
+ /**
+ * @return Topic.
+ */
+ public Object topic() {
+ return topic;
+ }
+
+ /**
+ * @return Serialized topic.
+ */
+ public byte[] topicBytes() {
+ return topicBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridJobSiblingsRequest _clone = new GridJobSiblingsRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridJobSiblingsRequest _clone = (GridJobSiblingsRequest)_msg;
+
+ _clone.sesId = sesId;
+ _clone.topic = topic;
+ _clone.topicBytes = topicBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putGridUuid(sesId))
++ if (!commState.putGridUuid("sesId", sesId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putByteArray(topicBytes))
++ if (!commState.putByteArray("topicBytes", topicBytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- IgniteUuid sesId0 = commState.getGridUuid();
++ sesId = commState.getGridUuid("sesId");
+
- if (sesId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sesId = sesId0;
-
+ commState.idx++;
+
+ case 1:
- byte[] topicBytes0 = commState.getByteArray();
++ topicBytes = commState.getByteArray("topicBytes");
+
- if (topicBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- topicBytes = topicBytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridJobSiblingsRequest.class, this);
+ }
+ }