You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:38 UTC
[15/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
deleted file mode 100644
index 802da02..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ /dev/null
@@ -1,1160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.*;
-import org.apache.ignite.spi.discovery.tcp.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
-
-/**
- * Base class for TCP discovery SPIs.
- */
-abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi {
- /** Default port to listen (value is <tt>47500</tt>). */
- public static final int DFLT_PORT = 47500;
-
- /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT = 200;
-
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 50;
-
- /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_NETWORK_TIMEOUT = 5000;
-
- /** Default value for thread priority (value is <tt>10</tt>). */
- public static final int DFLT_THREAD_PRI = 10;
-
- /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
- public static final long DFLT_HEARTBEAT_FREQ = 100;
-
- /** Default size of topology snapshots history. */
- public static final int DFLT_TOP_HISTORY_SIZE = 1000;
-
- /** Response OK. */
- protected static final int RES_OK = 1;
-
- /** Response CONTINUE JOIN. */
- protected static final int RES_CONTINUE_JOIN = 100;
-
- /** Response WAIT. */
- protected static final int RES_WAIT = 200;
-
- /** Local address. */
- protected String locAddr;
-
- /** IP finder. */
- protected TcpDiscoveryIpFinder ipFinder;
-
- /** Socket operations timeout. */
- protected long sockTimeout = DFLT_SOCK_TIMEOUT;
-
- /** Message acknowledgement timeout. */
- protected long ackTimeout = DFLT_ACK_TIMEOUT;
-
- /** Network timeout. */
- protected long netTimeout = DFLT_NETWORK_TIMEOUT;
-
- /** Thread priority for all threads started by SPI. */
- protected int threadPri = DFLT_THREAD_PRI;
-
- /** Heartbeat messages issuing frequency. */
- protected long hbFreq = DFLT_HEARTBEAT_FREQ;
-
- /** Size of topology snapshots history. */
- protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
-
- /** Grid discovery listener. */
- protected volatile DiscoverySpiListener lsnr;
-
- /** Data exchange. */
- protected DiscoverySpiDataExchange exchange;
-
- /** Metrics provider. */
- protected DiscoveryMetricsProvider metricsProvider;
-
- /** Local node attributes. */
- protected Map<String, Object> locNodeAttrs;
-
- /** Local node version. */
- protected IgniteProductVersion locNodeVer;
-
- /** Local node. */
- protected TcpDiscoveryNode locNode;
-
- /** Local host. */
- protected InetAddress locHost;
-
- /** Internal and external addresses of local node. */
- protected Collection<InetSocketAddress> locNodeAddrs;
-
- /** Socket timeout worker. */
- protected SocketTimeoutWorker sockTimeoutWorker;
-
- /** Discovery state. */
- protected TcpDiscoverySpiState spiState = DISCONNECTED;
-
- /** Start time of the very first grid node. */
- protected volatile long gridStartTime;
-
- /** Marshaller. */
- protected final Marshaller marsh = new JdkMarshaller();
-
- /** Statistics. */
- protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
-
- /** Logger. */
- @LoggerResource
- protected IgniteLogger log;
-
- /**
- * Inject resources
- *
- * @param ignite Ignite.
- */
- @IgniteInstanceResource
- @Override protected void injectResources(Ignite ignite) {
- super.injectResources(ignite);
-
- // Inject resource.
- if (ignite != null)
- setLocalAddress(ignite.configuration().getLocalHost());
- }
-
- /**
- * Sets local host IP address that discovery SPI uses.
- * <p>
- * If not provided, by default a first found non-loopback address
- * will be used. If there is no non-loopback address available,
- * then {@link InetAddress#getLocalHost()} will be used.
- *
- * @param locAddr IP address.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setLocalAddress(String locAddr) {
- // Injection should not override value already set by Spring or user.
- if (this.locAddr == null)
- this.locAddr = locAddr;
- }
-
- /**
- * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
- *
- * @return local address.
- */
- public String getLocalAddress() {
- return locAddr;
- }
-
- /**
- * Gets IP finder for IP addresses sharing and storing.
- *
- * @return IP finder for IP addresses sharing and storing.
- */
- public TcpDiscoveryIpFinder getIpFinder() {
- return ipFinder;
- }
-
- /**
- * Sets IP finder for IP addresses sharing and storing.
- * <p>
- * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
- *
- * @param ipFinder IP finder.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setIpFinder(TcpDiscoveryIpFinder ipFinder) {
- this.ipFinder = ipFinder;
- }
-
- /**
- * Sets socket operations timeout. This timeout is used to limit connection time and
- * write-to-socket time.
- * <p>
- * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
- * significantly greater than the default (e.g. to {@code 30000}).
- * <p>
- * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}.
- *
- * @param sockTimeout Socket connection timeout.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setSocketTimeout(long sockTimeout) {
- this.sockTimeout = sockTimeout;
- }
-
- /**
- * Sets timeout for receiving acknowledgement for sent message.
- * <p>
- * If acknowledgement is not received within this timeout, sending is considered as failed
- * and SPI tries to repeat message sending.
- * <p>
- * If not specified, default is {@link #DFLT_ACK_TIMEOUT}.
- *
- * @param ackTimeout Acknowledgement timeout.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setAckTimeout(long ackTimeout) {
- this.ackTimeout = ackTimeout;
- }
-
- /**
- * Sets maximum network timeout to use for network operations.
- * <p>
- * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
- *
- * @param netTimeout Network timeout.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setNetworkTimeout(long netTimeout) {
- this.netTimeout = netTimeout;
- }
-
- /**
- * Sets thread priority. All threads within SPI will be started with it.
- * <p>
- * If not provided, default value is {@link #DFLT_THREAD_PRI}
- *
- * @param threadPri Thread priority.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setThreadPriority(int threadPri) {
- this.threadPri = threadPri;
- }
-
- /**
- * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
- * in configurable time interval to other nodes to notify them about its state.
- * <p>
- * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
- *
- * @param hbFreq Heartbeat frequency in milliseconds.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setHeartbeatFrequency(long hbFreq) {
- this.hbFreq = hbFreq;
- }
-
- /**
- * @return Size of topology snapshots history.
- */
- public long getTopHistorySize() {
- return topHistSize;
- }
-
- /**
- * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
- * {@link #DFLT_TOP_HISTORY_SIZE}.
- *
- * @param topHistSize Size of topology snapshots history.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setTopHistorySize(int topHistSize) {
- if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
- U.warn(log, "Topology history size should be greater than or equal to default size. " +
- "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
- ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
-
- return;
- }
-
- this.topHistSize = topHistSize;
- }
-
- /** {@inheritDoc} */
- @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
- assert locNodeAttrs == null;
- assert locNodeVer == null;
-
- if (log.isDebugEnabled()) {
- log.debug("Node attributes to set: " + attrs);
- log.debug("Node version to set: " + ver);
- }
-
- locNodeAttrs = attrs;
- locNodeVer = ver;
- }
-
- /** {@inheritDoc} */
- @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
- super.onContextInitialized0(spiCtx);
-
- ipFinder.onSpiContextInitialized(spiCtx);
- }
-
- /** {@inheritDoc} */
- @Override protected void onContextDestroyed0() {
- super.onContextDestroyed0();
-
- if (ipFinder != null)
- ipFinder.onSpiContextDestroyed();
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode getLocalNode() {
- return locNode;
- }
-
- /** {@inheritDoc} */
- @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
- this.lsnr = lsnr;
- }
-
- /** {@inheritDoc} */
- @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
- this.exchange = exchange;
- }
-
- /** {@inheritDoc} */
- @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
- this.metricsProvider = metricsProvider;
- }
-
- /** {@inheritDoc} */
- @Override public long getGridStartTime() {
- assert gridStartTime != 0;
-
- return gridStartTime;
- }
-
- /**
- * @param sockAddr Remote address.
- * @return Opened socket.
- * @throws IOException If failed.
- */
- protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
- assert sockAddr != null;
-
- InetSocketAddress resolved = sockAddr.isUnresolved() ?
- new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
-
- InetAddress addr = resolved.getAddress();
-
- assert addr != null;
-
- Socket sock = new Socket();
-
- sock.bind(new InetSocketAddress(locHost, 0));
-
- sock.setTcpNoDelay(true);
-
- sock.connect(resolved, (int)sockTimeout);
-
- writeToSocket(sock, U.IGNITE_HEADER);
-
- return sock;
- }
-
- /**
- * Writes message to the socket.
- *
- * @param sock Socket.
- * @param data Raw data to write.
- * @throws IOException If IO failed or write timed out.
- */
- @SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, byte[] data) throws IOException {
- assert sock != null;
- assert data != null;
-
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
- sockTimeoutWorker.addTimeoutObject(obj);
-
- IOException err = null;
-
- try {
- OutputStream out = sock.getOutputStream();
-
- out.write(data);
-
- out.flush();
- }
- catch (IOException e) {
- err = e;
- }
- finally {
- boolean cancelled = obj.cancel();
-
- if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
-
- // Throw original exception.
- if (err != null)
- throw err;
-
- if (!cancelled)
- throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
- }
- }
-
- /**
- * Writes message to the socket.
- *
- * @param sock Socket.
- * @param msg Message.
- * @throws IOException If IO failed or write timed out.
- * @throws IgniteCheckedException If marshalling failed.
- */
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
- writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
- }
-
- /**
- * Writes message to the socket.
- *
- * @param sock Socket.
- * @param msg Message.
- * @param bout Byte array output stream.
- * @throws IOException If IO failed or write timed out.
- * @throws IgniteCheckedException If marshalling failed.
- */
- @SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
- throws IOException, IgniteCheckedException {
- assert sock != null;
- assert msg != null;
- assert bout != null;
-
- // Marshall message first to perform only write after.
- marsh.marshal(msg, bout);
-
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
- sockTimeoutWorker.addTimeoutObject(obj);
-
- IOException err = null;
-
- try {
- OutputStream out = sock.getOutputStream();
-
- bout.writeTo(out);
-
- out.flush();
- }
- catch (IOException e) {
- err = e;
- }
- finally {
- boolean cancelled = obj.cancel();
-
- if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
-
- // Throw original exception.
- if (err != null)
- throw err;
-
- if (!cancelled)
- throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
- }
- }
-
- /**
- * Writes response to the socket.
- *
- * @param sock Socket.
- * @param res Integer response.
- * @throws IOException If IO failed or write timed out.
- */
- @SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, int res) throws IOException {
- assert sock != null;
-
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
- sockTimeoutWorker.addTimeoutObject(obj);
-
- OutputStream out = sock.getOutputStream();
-
- IOException err = null;
-
- try {
- out.write(res);
-
- out.flush();
- }
- catch (IOException e) {
- err = e;
- }
- finally {
- boolean cancelled = obj.cancel();
-
- if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
-
- // Throw original exception.
- if (err != null)
- throw err;
-
- if (!cancelled)
- throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
- }
- }
-
- /**
- * Reads message from the socket limiting read time.
- *
- * @param sock Socket.
- * @param in Input stream (in case socket stream was wrapped).
- * @param timeout Socket timeout for this operation.
- * @return Message.
- * @throws IOException If IO failed or read timed out.
- * @throws IgniteCheckedException If unmarshalling failed.
- */
- protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
- assert sock != null;
-
- int oldTimeout = sock.getSoTimeout();
-
- try {
- sock.setSoTimeout((int)timeout);
-
- return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
- }
- catch (IOException | IgniteCheckedException e) {
- if (X.hasCause(e, SocketTimeoutException.class))
- LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
- "in long GC pauses on remote node. Current timeout: " + timeout + '.');
-
- throw e;
- }
- finally {
- // Quietly restore timeout.
- try {
- sock.setSoTimeout(oldTimeout);
- }
- catch (SocketException ignored) {
- // No-op.
- }
- }
- }
-
- /**
- * Reads message delivery receipt from the socket.
- *
- * @param sock Socket.
- * @param timeout Socket timeout for this operation.
- * @return Receipt.
- * @throws IOException If IO failed or read timed out.
- */
- protected int readReceipt(Socket sock, long timeout) throws IOException {
- assert sock != null;
-
- int oldTimeout = sock.getSoTimeout();
-
- try {
- sock.setSoTimeout((int)timeout);
-
- int res = sock.getInputStream().read();
-
- if (res == -1)
- throw new EOFException();
-
- return res;
- }
- catch (SocketTimeoutException e) {
- LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
- "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
- "configuration property). Will retry to send message with increased timeout. " +
- "Current timeout: " + timeout + '.');
-
- stats.onAckTimeout();
-
- throw e;
- }
- finally {
- // Quietly restore timeout.
- try {
- sock.setSoTimeout(oldTimeout);
- }
- catch (SocketException ignored) {
- // No-op.
- }
- }
- }
-
- /**
- * Resolves addresses registered in the IP finder, removes duplicates and local host
- * address and returns the collection of.
- *
- * @return Resolved addresses without duplicates and local address (potentially
- * empty but never null).
- * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
- */
- protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
- List<InetSocketAddress> res = new ArrayList<>();
-
- Collection<InetSocketAddress> addrs;
-
- // Get consistent addresses collection.
- while (true) {
- try {
- addrs = registeredAddresses();
-
- break;
- }
- catch (IgniteSpiException e) {
- LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
- "(retrying every 2000 ms).");
- }
-
- try {
- U.sleep(2000);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
-
- for (InetSocketAddress addr : addrs) {
- assert addr != null;
-
- try {
- InetSocketAddress resolved = addr.isUnresolved() ?
- new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
-
- if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
- res.add(resolved);
- }
- catch (UnknownHostException ignored) {
- LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
-
- // Add address in any case.
- res.add(addr);
- }
- }
-
- if (!res.isEmpty())
- Collections.shuffle(res);
-
- return res;
- }
-
- /**
- * Gets addresses registered in the IP finder, initializes addresses having no
- * port (or 0 port) with {@link #DFLT_PORT}.
- *
- * @return Registered addresses.
- * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
- */
- protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
- Collection<InetSocketAddress> res = new ArrayList<>();
-
- for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
- if (addr.getPort() == 0) {
- // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
- int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
-
- addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
- new InetSocketAddress(addr.getAddress(), port);
- }
-
- res.add(addr);
- }
-
- return res;
- }
-
- /**
- * @param msg Message.
- * @return Error.
- */
- protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
- assert msg != null;
-
- return new IgniteSpiException("Local node has the same ID as existing node in topology " +
- "(fix configuration and restart local node) [localNode=" + locNode +
- ", existingNode=" + msg.node() + ']');
- }
-
- /**
- * @param msg Message.
- * @return Error.
- */
- protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
- assert msg != null;
-
- return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
- msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
- }
-
- /**
- * @param msg Message.
- * @return Error.
- */
- protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
- assert msg != null;
-
- return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
- new IgniteSpiException(msg.error());
- }
-
- /**
- * @param msg Message.
- * @return Whether delivery of the message is ensured.
- */
- protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
- return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
- }
-
- /**
- * @param msg Failed message.
- * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
- * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
- * and create separate message for failed version check with next major release.
- */
- @Deprecated
- private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
- return msg.error().contains("versions are not compatible");
- }
-
- /**
- * @param joiningNodeID Joining node ID.
- * @param nodeId Remote node ID for which data is provided.
- * @param data Collection of marshalled discovery data objects from different components.
- * @param clsLdr Class loader for discovery data unmarshalling.
- */
- protected void onExchange(UUID joiningNodeID,
- UUID nodeId,
- Map<Integer, byte[]> data,
- ClassLoader clsLdr)
- {
- Map<Integer, Serializable> data0 = U.newHashMap(data.size());
-
- for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
- try {
- Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr);
-
- data0.put(entry.getKey(), compData);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
- }
- }
-
- exchange.onExchange(joiningNodeID, nodeId, data0);
- }
-
- /**
- * Handles sockets timeouts.
- */
- protected class SocketTimeoutWorker extends IgniteSpiThread {
- /** Time-based sorted set for timeout objects. */
- private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
- new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
- @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
- long time1 = o1.endTime();
- long time2 = o2.endTime();
-
- long id1 = o1.id();
- long id2 = o2.id();
-
- return time1 < time2 ? -1 : time1 > time2 ? 1 :
- id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
- }
- });
-
- /** Mutex. */
- private final Object mux0 = new Object();
-
- /**
- *
- */
- SocketTimeoutWorker() {
- super(gridName, "tcp-disco-sock-timeout-worker", log);
-
- setPriority(threadPri);
- }
-
- /**
- * @param timeoutObj Timeout object to add.
- */
- @SuppressWarnings({"NakedNotify"})
- public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
- assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
- timeoutObjs.add(timeoutObj);
-
- if (timeoutObjs.firstx() == timeoutObj) {
- synchronized (mux0) {
- mux0.notifyAll();
- }
- }
- }
-
- /**
- * @param timeoutObj Timeout object to remove.
- */
- public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
- assert timeoutObj != null;
-
- timeoutObjs.remove(timeoutObj);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Socket timeout worker has been started.");
-
- while (!isInterrupted()) {
- long now = U.currentTimeMillis();
-
- for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
- SocketTimeoutObject timeoutObj = iter.next();
-
- if (timeoutObj.endTime() <= now) {
- iter.remove();
-
- if (timeoutObj.onTimeout()) {
- LT.warn(log, null, "Socket write has timed out (consider increasing " +
- "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
-
- stats.onSocketTimeout();
- }
- }
- else
- break;
- }
-
- synchronized (mux0) {
- while (true) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTimeoutObject(..)' method.
- SocketTimeoutObject first = timeoutObjs.firstx();
-
- if (first != null) {
- long waitTime = first.endTime() - U.currentTimeMillis();
-
- if (waitTime > 0)
- mux0.wait(waitTime);
- else
- break;
- }
- else
- mux0.wait(5000);
- }
- }
- }
- }
- }
-
- /**
- * Socket timeout object.
- */
- private static class SocketTimeoutObject {
- /** */
- private static final AtomicLong idGen = new AtomicLong();
-
- /** */
- private final long id = idGen.incrementAndGet();
-
- /** */
- private final Socket sock;
-
- /** */
- private final long endTime;
-
- /** */
- private final AtomicBoolean done = new AtomicBoolean();
-
- /**
- * @param sock Socket.
- * @param endTime End time.
- */
- SocketTimeoutObject(Socket sock, long endTime) {
- assert sock != null;
- assert endTime > 0;
-
- this.sock = sock;
- this.endTime = endTime;
- }
-
- /**
- * @return {@code True} if object has not yet been processed.
- */
- boolean cancel() {
- return done.compareAndSet(false, true);
- }
-
- /**
- * @return {@code True} if object has not yet been canceled.
- */
- boolean onTimeout() {
- if (done.compareAndSet(false, true)) {
- // Close socket - timeout occurred.
- U.closeQuiet(sock);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return End time.
- */
- long endTime() {
- return endTime;
- }
-
- /**
- * @return ID.
- */
- long id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SocketTimeoutObject.class, this);
- }
- }
-
- /**
- * Base class for message workers.
- */
- protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
- /** Pre-allocated output stream (100K). */
- private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
-
- /** Message queue. */
- private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
-
- /** Backed interrupted flag. */
- private volatile boolean interrupted;
-
- /**
- * @param name Thread name.
- */
- protected MessageWorkerAdapter(String name) {
- super(gridName, name, log);
-
- setPriority(threadPri);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
-
- while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
-
- if (msg == null)
- continue;
-
- processMessage(msg);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void interrupt() {
- interrupted = true;
-
- super.interrupt();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isInterrupted() {
- return interrupted || super.isInterrupted();
- }
-
- /**
- * @return Current queue size.
- */
- int queueSize() {
- return queue.size();
- }
-
- /**
- * Adds message to queue.
- *
- * @param msg Message to add.
- */
- void addMessage(TcpDiscoveryAbstractMessage msg) {
- assert msg != null;
-
- if (msg instanceof TcpDiscoveryHeartbeatMessage)
- queue.addFirst(msg);
- else
- queue.add(msg);
-
- if (log.isDebugEnabled())
- log.debug("Message has been added to queue: " + msg);
- }
-
- /**
- * @param msg Message.
- */
- protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
-
- /**
- * @param sock Socket.
- * @param msg Message.
- * @throws IOException If IO failed.
- * @throws IgniteCheckedException If marshalling failed.
- */
- protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
- throws IOException, IgniteCheckedException {
- bout.reset();
-
- TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
- }
- }
-
- /**
- *
- */
- protected class SocketMultiConnector implements AutoCloseable {
- /** */
- private int connInProgress;
-
- /** */
- private final ExecutorService executor;
-
- /** */
- private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
-
- /**
- * @param addrs Addresses.
- * @param retryCnt Retry count.
- */
- public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
- connInProgress = addrs.size();
-
- executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
-
- completionSrvc = new ExecutorCompletionService<>(executor);
-
- for (final InetSocketAddress addr : addrs) {
- completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
- @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
- Exception ex = null;
- Socket sock = null;
-
- for (int i = 0; i < retryCnt; i++) {
- if (Thread.currentThread().isInterrupted())
- return null; // Executor is shutdown.
-
- try {
- sock = openSocket(addr);
-
- break;
- }
- catch (Exception e) {
- ex = e;
- }
- }
-
- return new GridTuple3<>(addr, sock, ex);
- }
- });
- }
- }
-
- /**
- *
- */
- @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
- if (connInProgress == 0)
- return null;
-
- try {
- Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
-
- connInProgress--;
-
- return fut.get();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- catch (ExecutionException e) {
- throw new IgniteSpiException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- List<Runnable> unstartedTasks = executor.shutdownNow();
-
- connInProgress -= unstartedTasks.size();
-
- if (connInProgress > 0) {
- Thread thread = new Thread(new Runnable() {
- @Override public void run() {
- try {
- executor.awaitTermination(5, TimeUnit.MINUTES);
-
- Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
-
- while ((fut = completionSrvc.poll()) != null) {
- try {
- GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
-
- if (tuple3 != null)
- IgniteUtils.closeQuiet(tuple3.get2());
- }
- catch (ExecutionException ignore) {
-
- }
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new RuntimeException(e);
- }
- }
- });
-
- thread.setDaemon(true);
-
- thread.start();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index df9d0f4..f338fab 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -272,4 +272,13 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
*/
@MXBeanDescription("Dump debug info.")
public void dumpDebugInfo();
+
+ /**
+ * Whether or not discovery is started in client mode.
+ *
+ * @return {@code true} if node is in client mode.
+ * @throws IllegalStateException If discovery SPI is not started.
+ */
+ @MXBeanDescription("Client mode.")
+ public boolean isClientMode() throws IllegalStateException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index bb8f051..cc61c9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -448,11 +448,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
ClusterMetrics metrics = this.metrics;
- if (metrics != null) {
- mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
-
- ClusterMetricsSnapshot.serialize(mtr, 0, metrics);
- }
+ if (metrics != null)
+ mtr = ClusterMetricsSnapshot.serialize(metrics);
U.writeByteArray(out, mtr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e866504..e9eaa1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -32,7 +32,7 @@ import java.util.concurrent.locks.*;
*/
public class TcpDiscoveryNodesRing {
/** Visible nodes filter. */
- private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+ public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
@Override public boolean apply(TcpDiscoveryNode node) {
return node.visible();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
index 51ad7b4..95758e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
@@ -31,7 +31,7 @@ public interface TcpDiscoveryIpFinder {
* method is completed, SPI context can be stored for future access.
*
* @param spiCtx Spi context.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException;
@@ -46,7 +46,7 @@ public interface TcpDiscoveryIpFinder {
* Initializes addresses discovery SPI binds to.
*
* @param addrs Addresses discovery SPI binds to.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
@@ -54,7 +54,7 @@ public interface TcpDiscoveryIpFinder {
* Gets all addresses registered in this finder.
*
* @return All known addresses, potentially empty, but never {@code null}.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException;
@@ -76,7 +76,7 @@ public interface TcpDiscoveryIpFinder {
* is already registered.
*
* @param addrs Addresses to register. Not {@code null} and not empty.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
@@ -87,7 +87,7 @@ public interface TcpDiscoveryIpFinder {
* registered quietly (just no-op).
*
* @param addrs Addresses to unregister. Not {@code null} and not empty.
- * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ * @throws IgniteSpiException In case of error.
*/
public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 6cf06ab..a992620 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -26,6 +26,8 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.jetbrains.annotations.*;
@@ -254,6 +256,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
"(it is recommended in production to specify at least one address in " +
"TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
+ boolean clientMode;
+
+ if (ignite != null) { // Can be null if used in tests without starting Ignite.
+ DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
+
+ if (!(discoSpi instanceof TcpDiscoverySpi))
+ throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
+ "TcpDiscoverySpi: " + discoSpi);
+
+ clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
+ }
+ else
+ clientMode = false;
+
InetAddress mcastAddr;
try {
@@ -296,7 +312,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
if (!addr.isLoopbackAddress()) {
try {
- addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
+ if (!clientMode)
+ addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
reqItfs.add(addr);
}
@@ -309,20 +326,24 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
}
- if (addrSnds.isEmpty()) {
- try {
- // Create non-bound socket if local host is loopback or failed to create sockets explicitly
- // bound to interfaces.
- addrSnds.add(new AddressSender(mcastAddr, null, addrs));
- }
- catch (IOException e) {
- throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
- ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+ if (!clientMode) {
+ if (addrSnds.isEmpty()) {
+ try {
+ // Create non-bound socket if local host is loopback or failed to create sockets explicitly
+ // bound to interfaces.
+ addrSnds.add(new AddressSender(mcastAddr, null, addrs));
+ }
+ catch (IOException e) {
+ throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
+ ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+ }
}
- }
- for (AddressSender addrSnd :addrSnds)
- addrSnd.start();
+ for (AddressSender addrSnd : addrSnds)
+ addrSnd.start();
+ }
+ else
+ assert addrSnds.isEmpty() : addrSnds;
Collection<InetSocketAddress> ret;
@@ -495,11 +516,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
/** {@inheritDoc} */
@Override public void close() {
- for (AddressSender addrSnd : addrSnds)
- U.interrupt(addrSnd);
+ if (addrSnds != null) {
+ for (AddressSender addrSnd : addrSnds)
+ U.interrupt(addrSnd);
- for (AddressSender addrSnd : addrSnds)
- U.join(addrSnd, log);
+ for (AddressSender addrSnd : addrSnds)
+ U.join(addrSnd, log);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 1a00359..145b518 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -52,9 +52,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
/** Topology version. */
private long topVer;
- /** Destination client node ID. */
- private UUID destClientNodeId;
-
/** Flags. */
@GridToStringExclude
private int flags;
@@ -178,20 +175,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
}
/**
- * @return Destination client node ID.
- */
- public UUID destinationClientNodeId() {
- return destClientNodeId;
- }
-
- /**
- * @param destClientNodeId Destination client node ID.
- */
- public void destinationClientNodeId(UUID destClientNodeId) {
- this.destClientNodeId = destClientNodeId;
- }
-
- /**
* @return Pending message index.
*/
public short pendingIndex() {
@@ -232,6 +215,13 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
flags &= ~mask;
}
+ /**
+ * @return {@code true} if message must be added to head of queue.
+ */
+ public boolean highPriority() {
+ return false;
+ }
+
/** {@inheritDoc} */
@Override public final boolean equals(Object obj) {
if (this == obj)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
new file mode 100644
index 0000000..95ac340
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Heartbeat message.
+ * <p>
+ * Client sends his hearbeats in this message.
+ */
+public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final byte[] metrics;
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId Creator node.
+ */
+ public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) {
+ super(creatorNodeId);
+
+ this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+ }
+
+ /**
+ * Gets metrics map.
+ *
+ * @return Metrics map.
+ */
+ public ClusterMetrics metrics() {
+ return ClusterMetricsSnapshot.deserialize(metrics, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean highPriority() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
new file mode 100644
index 0000000..f9f164d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Pinged client node ID. */
+ private final UUID nodeToPing;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param nodeToPing Pinged client node ID.
+ */
+ public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) {
+ super(creatorNodeId);
+
+ this.nodeToPing = nodeToPing;
+ }
+
+ /**
+ * @return Pinged client node ID.
+ */
+ @Nullable public UUID nodeToPing() {
+ return nodeToPing;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
new file mode 100644
index 0000000..26a2b00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Pinged client node ID. */
+ private final UUID nodeToPing;
+
+ /** */
+ private final boolean res;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param nodeToPing Pinged client node ID.
+ */
+ public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) {
+ super(creatorNodeId);
+
+ this.nodeToPing = nodeToPing;
+ this.res = res;
+ }
+
+ /**
+ * @return Pinged client node ID.
+ */
+ @Nullable public UUID nodeToPing() {
+ return nodeToPing;
+ }
+
+ /**
+ * @return Result of ping.
+ */
+ public boolean result() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 4e42f2d..0739c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,29 +18,34 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.*;
/**
* Wrapped for custom message.
*/
+@TcpDiscoveryRedirectToClient
@TcpDiscoveryEnsureDelivery
public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage {
/** */
private static final long serialVersionUID = 0L;
/** */
- private transient Serializable msg;
+ private transient volatile DiscoverySpiCustomMessage msg;
/** */
- private final byte[] msgBytes;
+ private byte[] msgBytes;
/**
* @param creatorNodeId Creator node id.
+ * @param msg Message.
* @param msgBytes Serialized message.
*/
- public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, Serializable msg, byte[] msgBytes) {
+ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
+ @NotNull byte[] msgBytes) {
super(creatorNodeId);
this.msg = msg;
@@ -48,17 +53,33 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
}
/**
- * @return Message.
+ * @return Serialized message.
*/
- public Serializable message() {
- return msg;
+ public byte[] messageBytes() {
+ return msgBytes;
}
/**
- * @return Serialized message.
+ * @param msg Message.
+ * @param msgBytes Serialized message.
*/
- public byte[] messageBytes() {
- return msgBytes;
+ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) {
+ this.msg = msg;
+ this.msgBytes = msgBytes;
+ }
+
+ /**
+ * @return Deserialized message,
+ * @throws java.lang.Throwable if unmarshal failed.
+ */
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable {
+ if (msg == null) {
+ msg = marsh.unmarshal(msgBytes, U.gridClassLoader());
+
+ assert msg != null;
+ }
+
+ return msg;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index bafde9f..f721401 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -58,13 +58,6 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new HashMap<>();
/**
- * Public default no-arg constructor for {@link Externalizable} interface.
- */
- public TcpDiscoveryHeartbeatMessage() {
- // No-op.
- }
-
- /**
* Constructor.
*
* @param creatorNodeId Creator node.
@@ -211,22 +204,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString());
+ @Override public boolean highPriority() {
+ return true;
}
- /**
- * @param metrics Metrics.
- * @return Serialized metrics.
- */
- private static byte[] serializeMetrics(ClusterMetrics metrics) {
- assert metrics != null;
-
- byte[] buf = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
-
- ClusterMetricsSnapshot.serialize(buf, 0, metrics);
-
- return buf;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString());
}
/**
@@ -273,7 +257,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
public MetricsSet(ClusterMetrics metrics) {
assert metrics != null;
- this.metrics = serializeMetrics(metrics);
+ this.metrics = ClusterMetricsSnapshot.serialize(metrics);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 5a71eb3..1d974e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -17,7 +17,9 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -34,6 +36,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
private final UUID nodeId;
/**
+ * Client node can not get discovery data from TcpDiscoveryNodeAddedMessage, we have to pass discovery data in
+ * TcpDiscoveryNodeAddFinishedMessage
+ */
+ @GridToStringExclude
+ private Map<UUID, Map<Integer, byte[]>> clientDiscoData;
+
+ /** */
+ @GridToStringExclude
+ private Map<String, Object> clientNodeAttrs;
+
+ /**
* Constructor.
*
* @param creatorNodeId ID of the creator node (coordinator).
@@ -54,6 +67,36 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
return nodeId;
}
+ /**
+ * @return Discovery data for joined client.
+ */
+ public Map<UUID, Map<Integer, byte[]>> clientDiscoData() {
+ return clientDiscoData;
+ }
+
+ /**
+ * @param clientDiscoData Discovery data for joined client.
+ */
+ public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
+ this.clientDiscoData = clientDiscoData;
+
+ assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId);
+ }
+
+ /**
+ * @return Client node attributes.
+ */
+ public Map<String, Object> clientNodeAttributes() {
+ return clientNodeAttrs;
+ }
+
+ /**
+ * @param clientNodeAttrs New client node attributes.
+ */
+ public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
+ this.clientNodeAttrs = clientNodeAttrs;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index a9303f3..2a14158 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -148,7 +148,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
*
* @return Map with topology snapshots history.
*/
- @Nullable public Map<Long, Collection<ClusterNode>> topologyHistory() {
+ public Map<Long, Collection<ClusterNode>> topologyHistory() {
return topHist;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
index de5b0a7..f17c91b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -47,4 +48,9 @@ public class TcpDiscoveryPingRequest extends TcpDiscoveryAbstractMessage {
@Nullable public UUID clientNodeId() {
return clientNodeId;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryPingRequest.class, this, "super", super.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
index 6396764..02b2d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
@@ -17,7 +17,8 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import java.io.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
import java.util.*;
/**
@@ -31,13 +32,6 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage {
private boolean clientExists;
/**
- * For {@link Externalizable}.
- */
- public TcpDiscoveryPingResponse() {
- // No-op.
- }
-
- /**
* @param creatorNodeId Creator node ID.
*/
public TcpDiscoveryPingResponse(UUID creatorNodeId) {
@@ -57,4 +51,9 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage {
public boolean clientExists() {
return clientExists;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryPingResponse.class, this, "super", super.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
index 8dba0db..a47a17f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
@@ -45,7 +45,7 @@ class RoundRobinGlobalLoadBalancer {
private final IgniteLogger log;
/** Current snapshot of nodes which participated in load balancing. */
- private volatile GridNodeList nodeList = new GridNodeList(0, null);
+ private volatile GridNodeList nodeList = new GridNodeList(0, new ArrayList<UUID>(0));
/** Mutex for updating current topology. */
private final Object mux = new Object();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..7a88426 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -387,15 +387,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
Space space = space(spaceName, false);
- if (space == null)
- return;
-
- byte[] val = space.remove(key, c != null);
+ byte[] val = space == null ? null : space.remove(key, c != null);
if (c != null)
c.apply(val);
- notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+ if (space != null)
+ notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
index 79b42b2..0b60ff7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
@@ -5,8 +5,12 @@
*/
/*
- * The initial version of this file was copied from JSR-166:
- * http://gee.cs.oswego.edu/dl/concurrency-interest/
+ * The latest version of the file was copied from the following CVS repository:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ *
+ * Corresponding commit version in CVS repository is unknown (lost on our side).
+ * On the other hand we can't simply synch the latest version from CVS here, because Ignite uses functionality that
+ * is no longer supported.
*/
package org.jsr166;