You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/31 16:22:36 UTC
[02/18] ignite git commit: IGNITE-4564: All setters on public
configuration now return "this" instance to allow convenient chaining. This
closes #1449.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 42879b7..f13f1f2 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -106,7 +106,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -122,6 +121,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -241,8 +241,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
-public class TcpCommunicationSpi extends IgniteSpiAdapter
- implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message> {
/** IPC error message. */
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
@@ -1100,12 +1099,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Sets address resolver.
*
* @param addrRslvr Address resolver.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setAddressResolver(AddressResolver addrRslvr) {
+ public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr) {
// Injection should not override value already set by Spring or user.
if (this.addrRslvr == null)
this.addrRslvr = addrRslvr;
+
+ return this;
}
/**
@@ -1130,16 +1132,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* @param locAddr IP address. Default value is any available local
* IP address.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalAddress(String locAddr) {
+ public TcpCommunicationSpi setLocalAddress(String locAddr) {
// Injection should not override value already set by Spring or user.
if (this.locAddr == null)
this.locAddr = locAddr;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public String getLocalAddress() {
+ /**
+ * See {@link #setLocalAddress(String)}.
+ *
+ * @return Grid node IP address.
+ */
+ public String getLocalAddress() {
return locAddr;
}
@@ -1149,14 +1158,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_PORT}.
*
* @param locPort Port number.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalPort(int locPort) {
+ public TcpCommunicationSpi setLocalPort(int locPort) {
this.locPort = locPort;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getLocalPort() {
+ /**
+ * See {@link #setLocalPort(int)}.
+ *
+ * @return Port number.
+ */
+ public int getLocalPort() {
return locPort;
}
@@ -1175,19 +1191,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_PORT_RANGE}.
*
* @param locPortRange New local port range.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalPortRange(int locPortRange) {
+ public TcpCommunicationSpi setLocalPortRange(int locPortRange) {
this.locPortRange = locPortRange;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getLocalPortRange() {
+ /**
+ * See {@link #setLocalPortRange(int)}.
+ *
+ * @return Local Port range.
+ */
+ public int getLocalPortRange() {
return locPortRange;
}
- /** {@inheritDoc} */
- @Override public boolean isUsePairedConnections() {
+ /**
+ * See {@link #setUsePairedConnections(boolean)}.
+ *
+ * @return {@code true} to use paired connections and {@code false} otherwise.
+ */
+ public boolean isUsePairedConnections() {
return usePairedConnections;
}
@@ -1205,9 +1232,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
* @see #getConnectionsPerNode()
+ * @return {@code this} for chaining.
*/
- public void setUsePairedConnections(boolean usePairedConnections) {
+ public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) {
this.usePairedConnections = usePairedConnections;
+
+ return this;
}
/**
@@ -1217,13 +1247,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* @param maxConnectionsPerNode Number of connections per node.
* @see #isUsePairedConnections()
+ * @return {@code this} for chaining.
*/
- public void setConnectionsPerNode(int maxConnectionsPerNode) {
+ public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) {
this.connectionsPerNode = maxConnectionsPerNode;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getConnectionsPerNode() {
+ /**
+ * See {@link #setConnectionsPerNode(int)}.
+ *
+ * @return Number of connections per node.
+ */
+ public int getConnectionsPerNode() {
return connectionsPerNode;
}
@@ -1235,14 +1272,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_SHMEM_PORT}.
*
* @param shmemPort Port number.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setSharedMemoryPort(int shmemPort) {
+ public TcpCommunicationSpi setSharedMemoryPort(int shmemPort) {
this.shmemPort = shmemPort;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getSharedMemoryPort() {
+ /**
+ * See {@link #setSharedMemoryPort(int)}.
+ *
+ * @return Port number.
+ */
+ public int getSharedMemoryPort() {
return shmemPort;
}
@@ -1253,19 +1297,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}.
*
* @param idleConnTimeout Maximum idle connection time.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setIdleConnectionTimeout(long idleConnTimeout) {
+ public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout) {
this.idleConnTimeout = idleConnTimeout;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public long getIdleConnectionTimeout() {
+ /**
+ * See {@link #setIdleConnectionTimeout(long)}.
+ *
+ * @return Maximum idle connection time.
+ */
+ public long getIdleConnectionTimeout() {
return idleConnTimeout;
}
- /** {@inheritDoc} */
- @Override public long getSocketWriteTimeout() {
+ /**
+ * See {@link #setSocketWriteTimeout(long)}.
+ *
+ * @return Socket write timeout for TCP connections.
+ */
+ public long getSocketWriteTimeout() {
return sockWriteTimeout;
}
@@ -1276,14 +1331,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
*
* @param sockWriteTimeout Socket write timeout for TCP connection.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setSocketWriteTimeout(long sockWriteTimeout) {
+ public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout) {
this.sockWriteTimeout = sockWriteTimeout;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getAckSendThreshold() {
+ /**
+ * See {@link #setAckSendThreshold(int)}.
+ *
+ * @return Number of received messages after which acknowledgment is sent.
+ */
+ public int getAckSendThreshold() {
return ackSndThreshold;
}
@@ -1293,14 +1355,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Default to {@link #DFLT_ACK_SND_THRESHOLD}.
*
* @param ackSndThreshold Number of received messages after which acknowledgment is sent.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setAckSendThreshold(int ackSndThreshold) {
+ public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold) {
this.ackSndThreshold = ackSndThreshold;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getUnacknowledgedMessagesBufferSize() {
+ /**
+ * See {@link #setUnacknowledgedMessagesBufferSize(int)}.
+ *
+ * @return Maximum number of unacknowledged messages.
+ */
+ public int getUnacknowledgedMessagesBufferSize() {
return unackedMsgsBufSize;
}
@@ -1310,17 +1379,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* closed and reconnect is attempted.
*
* @param unackedMsgsBufSize Maximum number of unacknowledged messages.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
+ public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
this.unackedMsgsBufSize = unackedMsgsBufSize;
+
+ return this;
}
/**
* Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
*
* @param connBufSize Connection buffer size.
- * @see #setConnectionBufferFlushFrequency(long)
* @deprecated Not used any more.
*/
@Deprecated
@@ -1329,22 +1400,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// No-op.
}
- /** {@inheritDoc} */
+ /**
+ * Gets connection buffer size.
+ * <p>
+ * If set to {@code 0} connection buffer is disabled.
+ *
+ * @return Connection buffer size.
+ * @deprecated Not used anymore.
+ */
@Deprecated
- @Override public int getConnectionBufferSize() {
+ public int getConnectionBufferSize() {
return 0;
}
- /** {@inheritDoc} */
+ /**
+ * Sets connection buffer flush frequency.
+ * <p>
+ * Client connections to other nodes in topology use buffered output.
+ * This frequency defines how often system will advice to flush
+ * connection buffer.
+ * <p>
+ * This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}.
+ *
+ * @param connBufFlushFreq Flush frequency.
+ * @see #getConnectionBufferSize()
+ * @deprecated Not used anymore.
+ */
@Deprecated
@IgniteSpiConfiguration(optional = true)
- @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
+ public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
// No-op.
}
- /** {@inheritDoc} */
+ /**
+ * Gets connection buffer size.
+ * <p>
+ * If set to {@code 0} connection buffer is disabled.
+ *
+ * @return Connection buffer size.
+ * @deprecated Not used anymore.
+ */
@Deprecated
- @Override public long getConnectionBufferFlushFrequency() {
+ public long getConnectionBufferFlushFrequency() {
return 0;
}
@@ -1359,16 +1456,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param connTimeout Connect timeout.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setConnectTimeout(long connTimeout) {
+ public TcpCommunicationSpi setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
failureDetectionTimeoutEnabled(false);
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public long getConnectTimeout() {
+ /**
+ * See {@link #setConnectTimeout(long)}.
+ *
+ * @return Connect timeout.
+ */public long getConnectTimeout() {
return connTimeout;
}
@@ -1385,16 +1488,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param maxConnTimeout Maximum connect timeout.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaxConnectTimeout(long maxConnTimeout) {
+ public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
failureDetectionTimeoutEnabled(false);
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public long getMaxConnectTimeout() {
+ /**
+ * Gets maximum connect timeout.
+ *
+ * @return Maximum connect timeout.
+ */
+ public long getMaxConnectTimeout() {
return maxConnTimeout;
}
@@ -1407,16 +1517,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param reconCnt Maximum number of reconnection attempts.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setReconnectCount(int reconCnt) {
+ public TcpCommunicationSpi setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
failureDetectionTimeoutEnabled(false);
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getReconnectCount() {
+ /**
+ * Gets maximum number of reconnect attempts used when establishing connection
+ * with remote nodes.
+ *
+ * @return Reconnects count.
+ */
+ public int getReconnectCount() {
return reconCnt;
}
@@ -1428,32 +1546,46 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@code true}.
*
* @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setDirectBuffer(boolean directBuf) {
+ public TcpCommunicationSpi setDirectBuffer(boolean directBuf) {
this.directBuf = directBuf;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public boolean isDirectBuffer() {
+ /**
+ * Gets flag that indicates whether direct or heap allocated buffer is used.
+ *
+ * @return Flag that indicates whether direct or heap allocated buffer is used.
+ */
+ public boolean isDirectBuffer() {
return directBuf;
}
- /** {@inheritDoc} */
- @Override public boolean isDirectSendBuffer() {
+ /**
+ * Gets flag defining whether direct send buffer should be used.
+ *
+ * @return {@code True} if direct buffers should be used.
+ */
+ public boolean isDirectSendBuffer() {
return directSndBuf;
}
/**
* Sets whether to use direct buffer for sending.
- * <p>
+ *
* If not provided default is {@code false}.
*
* @param directSndBuf {@code True} to use direct buffers for send.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setDirectSendBuffer(boolean directSndBuf) {
+ public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf) {
this.directSndBuf = directSndBuf;
+
+ return this;
}
/**
@@ -1462,19 +1594,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
*
* @param selectorsCnt Selectors count.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setSelectorsCount(int selectorsCnt) {
+ public TcpCommunicationSpi setSelectorsCount(int selectorsCnt) {
this.selectorsCnt = selectorsCnt;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getSelectorsCount() {
+ /**
+ * See {@link #setSelectorsCount(int)}.
+ *
+ * @return Count of selectors in TCP server.
+ */
+ public int getSelectorsCount() {
return selectorsCnt;
}
- /** {@inheritDoc} */
- @Override public long getSelectorSpins() {
+ /**
+ * See {@link #setSelectorSpins(long)}.
+ *
+ * @return Selector thread busy-loop iterations.
+ */
+ public long getSelectorSpins() {
return selectorSpins;
}
@@ -1484,9 +1627,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
*
* @param selectorSpins Selector thread busy-loop iterations.
+ * @return {@code this} for chaining.
*/
- public void setSelectorSpins(long selectorSpins) {
+ public TcpCommunicationSpi setSelectorSpins(long selectorSpins) {
this.selectorSpins = selectorSpins;
+
+ return this;
}
/**
@@ -1502,14 +1648,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default value is {@link #DFLT_TCP_NODELAY}.
*
* @param tcpNoDelay {@code True} to disable TCP delay.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setTcpNoDelay(boolean tcpNoDelay) {
+ public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public boolean isTcpNoDelay() {
+ /**
+ * Gets value for {@code TCP_NODELAY} socket option.
+ *
+ * @return {@code True} if TCP delay is disabled.
+ */
+ public boolean isTcpNoDelay() {
return tcpNoDelay;
}
@@ -1519,14 +1672,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockRcvBuf Socket receive buffer size.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setSocketReceiveBuffer(int sockRcvBuf) {
+ public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf) {
this.sockRcvBuf = sockRcvBuf;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getSocketReceiveBuffer() {
+ /**
+ * See {@link #setSocketReceiveBuffer(int)}.
+ *
+ * @return Socket receive buffer size.
+ */
+ public int getSocketReceiveBuffer() {
return sockRcvBuf;
}
@@ -1536,14 +1696,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockSndBuf Socket send buffer size.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setSocketSendBuffer(int sockSndBuf) {
+ public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf) {
this.sockSndBuf = sockSndBuf;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getSocketSendBuffer() {
+ /**
+ * See {@link #setSocketSendBuffer(int)}.
+ *
+ * @return Socket send buffer size.
+ */
+ public int getSocketSendBuffer() {
return sockSndBuf;
}
@@ -1556,19 +1723,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
*
* @param msgQueueLimit Send queue size limit.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMessageQueueLimit(int msgQueueLimit) {
+ public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit) {
this.msgQueueLimit = msgQueueLimit;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getMessageQueueLimit() {
+ /**
+ * Gets message queue limit for incoming and outgoing messages.
+ *
+ * @return Send queue size limit.
+ */
+ public int getMessageQueueLimit() {
return msgQueueLimit;
}
- /** {@inheritDoc} */
- @Override public int getSlowClientQueueLimit() {
+ /**
+ * See {@link #setSlowClientQueueLimit(int)}.
+ *
+ * @return Slow client queue limit.
+ */
+ public int getSlowClientQueueLimit() {
return slowClientQueueLimit;
}
@@ -1583,9 +1761,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* which means {@code unlimited}.
*
* @param slowClientQueueLimit Slow client queue limit.
+ * @return {@code this} for chaining.
*/
- public void setSlowClientQueueLimit(int slowClientQueueLimit) {
+ public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) {
this.slowClientQueueLimit = slowClientQueueLimit;
+
+ return this;
}
/**
@@ -1601,9 +1782,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// No-op.
}
- /** {@inheritDoc} */
+ /**
+ * Gets the minimum number of messages for this SPI, that are buffered
+ * prior to sending.
+ *
+ * @return Minimum buffered message count.
+ * @deprecated Not used anymore.
+ */
@Deprecated
- @Override public int getMinimumBufferedMessageCount() {
+ public int getMinimumBufferedMessageCount() {
return 0;
}
@@ -1656,8 +1843,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
rcvdBytesCnt.add(-rcvdBytesCnt.sum());
}
- /** {@inheritDoc} */
- @Override public void dumpStats() {
+ /**
+ * Dumps SPI per-connection stats to logs.
+ */
+ public void dumpStats() {
IgniteLogger log = this.log;
if (log != null) {
@@ -1879,7 +2068,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " +
"due to message queues growth on sender and receiver sides.");
- registerMBean(igniteInstanceName, this, TcpCommunicationSpiMBean.class);
+ registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this), TcpCommunicationSpiMBean.class);
connectGate = new ConnectGateway();
@@ -3424,6 +3613,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/** {@inheritDoc} */
+ @Override public TcpCommunicationSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
}
@@ -4562,4 +4758,178 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
int connectionIndex();
}
+
+ /**
+ * MBean implementation for TcpCommunicationSpi.
+ */
+ private class TcpCommunicationSpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpCommunicationSpiMBean {
+ /** {@inheritDoc} */
+ TcpCommunicationSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getLocalAddress() {
+ return TcpCommunicationSpi.this.getLocalAddress();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalPort() {
+ return TcpCommunicationSpi.this.getLocalPort();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalPortRange() {
+ return TcpCommunicationSpi.this.getLocalPortRange();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isUsePairedConnections() {
+ return TcpCommunicationSpi.this.isUsePairedConnections();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getConnectionsPerNode() {
+ return TcpCommunicationSpi.this.getConnectionsPerNode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSharedMemoryPort() {
+ return TcpCommunicationSpi.this.getSharedMemoryPort();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getIdleConnectionTimeout() {
+ return TcpCommunicationSpi.this.getIdleConnectionTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSocketWriteTimeout() {
+ return TcpCommunicationSpi.this.getSocketWriteTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getAckSendThreshold() {
+ return TcpCommunicationSpi.this.getAckSendThreshold();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getUnacknowledgedMessagesBufferSize() {
+ return TcpCommunicationSpi.this.getUnacknowledgedMessagesBufferSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getConnectTimeout() {
+ return TcpCommunicationSpi.this.getConnectTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxConnectTimeout() {
+ return TcpCommunicationSpi.this.getMaxConnectTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getReconnectCount() {
+ return TcpCommunicationSpi.this.getReconnectCount();
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated
+ @Override public int getConnectionBufferSize() {
+ return TcpCommunicationSpi.this.getConnectionBufferSize();
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated
+ @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
+ TcpCommunicationSpi.this.setConnectionBufferFlushFrequency(connBufFlushFreq);
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated
+ @Override public long getConnectionBufferFlushFrequency() {
+ return TcpCommunicationSpi.this.getConnectionBufferFlushFrequency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDirectBuffer() {
+ return TcpCommunicationSpi.this.isDirectBuffer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDirectSendBuffer() {
+ return TcpCommunicationSpi.this.isDirectSendBuffer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSelectorsCount() {
+ return TcpCommunicationSpi.this.getSelectorsCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSelectorSpins() {
+ return TcpCommunicationSpi.this.getSelectorSpins();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTcpNoDelay() {
+ return TcpCommunicationSpi.this.isTcpNoDelay();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSocketReceiveBuffer() {
+ return TcpCommunicationSpi.this.getSocketReceiveBuffer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSocketSendBuffer() {
+ return TcpCommunicationSpi.this.getSocketSendBuffer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageQueueLimit() {
+ return TcpCommunicationSpi.this.getMessageQueueLimit();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSlowClientQueueLimit() {
+ return TcpCommunicationSpi.this.getSlowClientQueueLimit();
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated
+ @Override public int getMinimumBufferedMessageCount() {
+ return TcpCommunicationSpi.this.getMinimumBufferedMessageCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpStats() {
+ TcpCommunicationSpi.this.dumpStats();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSentMessagesCount() {
+ return TcpCommunicationSpi.this.getSentMessagesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSentBytesCount() {
+ return TcpCommunicationSpi.this.getSentBytesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getReceivedMessagesCount() {
+ return TcpCommunicationSpi.this.getReceivedMessagesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getReceivedBytesCount() {
+ return TcpCommunicationSpi.this.getReceivedBytesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getOutboundMessagesQueueSize() {
+ return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java
index 66b715a..9d46737 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java
@@ -35,6 +35,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.deployment.DeploymentListener;
import org.apache.ignite.spi.deployment.DeploymentResource;
@@ -66,7 +67,7 @@ import org.jsr166.ConcurrentLinkedHashMap;
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
@IgnoreIfPeerClassLoadingDisabled
-public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSpi, LocalDeploymentSpiMBean {
+public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSpi {
/** */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@LoggerResource
@@ -76,7 +77,7 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp
private ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs =
new ConcurrentLinkedHashMap<>(16, 0.75f, 64);
- /** Deployment SPI listener. */
+ /** Deployment SPI listener. */
private volatile DeploymentListener lsnr;
/** {@inheritDoc} */
@@ -84,7 +85,7 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp
// Start SPI start stopwatch.
startStopwatch();
- registerMBean(igniteInstanceName, this, LocalDeploymentSpiMBean.class);
+ registerMBean(igniteInstanceName, new LocalDeploymentSpiMBeanImpl(this), LocalDeploymentSpiMBean.class);
if (log.isDebugEnabled())
log.debug(startInfo());
@@ -395,7 +396,24 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp
}
/** {@inheritDoc} */
+ @Override public LocalDeploymentSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(LocalDeploymentSpi.class, this);
}
+
+ /**
+ * MBean implementation for LocalDeploymentSpi.
+ */
+ private class LocalDeploymentSpiMBeanImpl extends IgniteSpiMBeanAdapter implements LocalDeploymentSpiMBean {
+ /** {@inheritDoc} */
+ LocalDeploymentSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e8b937a..19244dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -70,6 +70,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -217,7 +218,7 @@ import org.jetbrains.annotations.Nullable;
@IgniteSpiMultipleInstancesSupport(true)
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
@@ -403,18 +404,30 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** */
protected IgniteSpiContext spiCtx;
- /** {@inheritDoc} */
- @Override public String getSpiState() {
+ /**
+ * Gets current SPI state.
+ *
+ * @return Current SPI state.
+ */
+ public String getSpiState() {
return impl.getSpiState();
}
- /** {@inheritDoc} */
- @Override public int getMessageWorkerQueueSize() {
+ /**
+ * Gets message worker queue current size.
+ *
+ * @return Message worker queue current size.
+ */
+ public int getMessageWorkerQueueSize() {
return impl.getMessageWorkerQueueSize();
}
- /** {@inheritDoc} */
- @Nullable @Override public UUID getCoordinator() {
+ /**
+ * Gets current coordinator.
+ *
+ * @return Gets current coordinator.
+ */
+ public UUID getCoordinator() {
return impl.getCoordinator();
}
@@ -453,8 +466,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
impl.failNode(nodeId, warning);
}
- /** {@inheritDoc} */
- @Override public void dumpDebugInfo() {
+ /**
+ * Dumps debug info using configured logger.
+ */
+ public void dumpDebugInfo() {
impl.dumpDebugInfo(log);
}
@@ -580,8 +595,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return addrRslvr;
}
- /** {@inheritDoc} */
- @Override public int getReconnectCount() {
+ /**
+ * Gets number of connection attempts.
+ *
+ * @return Number of connection attempts.
+ */
+ public int getReconnectCount() {
return reconCnt;
}
@@ -608,8 +627,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public long getMaxAckTimeout() {
+ /**
+ * Gets maximum message acknowledgement timeout.
+ *
+ * @return Maximum message acknowledgement timeout.
+ */
+ public long getMaxAckTimeout() {
return maxAckTimeout;
}
@@ -639,8 +662,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public int getLocalPort() {
+ /**
+ * Gets local TCP port SPI listens to.
+ *
+ * @return Local port range.
+ */
+ public int getLocalPort() {
TcpDiscoveryNode locNode0 = locNode;
return locNode0 != null ? locNode0.discoveryPort() : 0;
@@ -663,8 +690,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public int getLocalPortRange() {
+ /**
+ * Gets local TCP port range.
+ *
+ * @return Local port range.
+ */
+ public int getLocalPortRange() {
return locPortRange;
}
@@ -689,8 +720,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public int getMaxMissedHeartbeats() {
+ /**
+ * Gets max heartbeats count node can miss without initiating status check.
+ *
+ * @return Max missed heartbeats.
+ */
+ public int getMaxMissedHeartbeats() {
return maxMissedHbs;
}
@@ -711,8 +746,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public int getMaxMissedClientHeartbeats() {
+ /**
+ * Gets max heartbeats count node can miss without failing client node.
+ *
+ * @return Max missed client heartbeats.
+ */
+ public int getMaxMissedClientHeartbeats() {
return maxMissedClientHbs;
}
@@ -731,8 +770,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public long getStatisticsPrintFrequency() {
+ /**
+ * Gets statistics print frequency.
+ *
+ * @return Statistics print frequency in milliseconds.
+ */
+ public long getStatisticsPrintFrequency() {
return statsPrintFreq;
}
@@ -755,8 +798,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public long getIpFinderCleanFrequency() {
+ /**
+ * Gets IP finder clean frequency.
+ *
+ * @return IP finder clean frequency.
+ */
+ public long getIpFinderCleanFrequency() {
return ipFinderCleanFreq;
}
@@ -862,8 +909,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return this;
}
- /** {@inheritDoc} */
- @Override public long getJoinTimeout() {
+ /**
+ * Gets join timeout.
+ *
+ * @return Join timeout.
+ */
+ public long getJoinTimeout() {
return joinTimeout;
}
@@ -964,6 +1015,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * Gets ID of the local node.
+ *
+ * @return ID of the local node.
+ */
+ public UUID getLocalNodeId() {
+ return ignite.cluster().localNode().id();
+ }
+
+ /**
* @param srvPort Server port.
* @param addExtAddrAttr If {@code true} adds {@link #ATTR_EXT_ADDRS} attribute.
*/
@@ -1061,93 +1121,164 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return F.<Object>asList(ipFinder);
}
- /** {@inheritDoc} */
- @Override public long getSocketTimeout() {
+ /**
+ * Gets socket timeout.
+ *
+ * @return Socket timeout.
+ */
+ public long getSocketTimeout() {
return sockTimeout;
}
- /** {@inheritDoc} */
- @Override public long getAckTimeout() {
+ /**
+ * Gets message acknowledgement timeout.
+ *
+ * @return Message acknowledgement timeout.
+ */
+ public long getAckTimeout() {
return ackTimeout;
}
- /** {@inheritDoc} */
- @Override public long getNetworkTimeout() {
+ /**
+ * Gets network timeout.
+ *
+ * @return Network timeout.
+ */
+ public long getNetworkTimeout() {
return netTimeout;
}
- /** {@inheritDoc} */
- @Override public int getThreadPriority() {
+ /**
+ * Gets thread priority. All threads within SPI will be started with it.
+ *
+ * @return Thread priority.
+ */
+ public int getThreadPriority() {
return threadPri;
}
- /** {@inheritDoc} */
- @Override public long getHeartbeatFrequency() {
+ /**
+ * Gets delay between heartbeat messages sent by coordinator.
+ *
+ * @return Time period in milliseconds.
+ */
+ public long getHeartbeatFrequency() {
return hbFreq;
}
- /** {@inheritDoc} */
- @Override public String getIpFinderFormatted() {
+ /**
+ * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
+ *
+ * @return IPFinder (string representation).
+ */public String getIpFinderFormatted() {
return ipFinder.toString();
}
- /** {@inheritDoc} */
- @Override public long getNodesJoined() {
+ /**
+ * Gets joined nodes count.
+ *
+ * @return Nodes joined count.
+ */
+ public long getNodesJoined() {
return stats.joinedNodesCount();
}
- /** {@inheritDoc} */
- @Override public long getNodesLeft() {
+ /**
+ * Gets left nodes count.
+ *
+ * @return Left nodes count.
+ */
+ public long getNodesLeft() {
return stats.leftNodesCount();
}
- /** {@inheritDoc} */
- @Override public long getNodesFailed() {
+ /**
+ * Gets failed nodes count.
+ *
+ * @return Failed nodes count.
+ */
+ public long getNodesFailed() {
return stats.failedNodesCount();
}
- /** {@inheritDoc} */
- @Override public long getPendingMessagesRegistered() {
+ /**
+ * Gets pending messages registered count.
+ *
+ * @return Pending messages registered count.
+ */
+ public long getPendingMessagesRegistered() {
return stats.pendingMessagesRegistered();
}
- /** {@inheritDoc} */
- @Override public long getPendingMessagesDiscarded() {
+ /**
+ * Gets pending messages discarded count.
+ *
+ * @return Pending messages registered count.
+ */
+ public long getPendingMessagesDiscarded() {
return stats.pendingMessagesDiscarded();
}
- /** {@inheritDoc} */
- @Override public long getAvgMessageProcessingTime() {
+ /**
+ * Gets avg message processing time.
+ *
+ * @return Avg message processing time.
+ */
+ public long getAvgMessageProcessingTime() {
return stats.avgMessageProcessingTime();
}
- /** {@inheritDoc} */
- @Override public long getMaxMessageProcessingTime() {
+ /**
+ * Gets max message processing time.
+ *
+ * @return Max message processing time.
+ */
+ public long getMaxMessageProcessingTime() {
return stats.maxMessageProcessingTime();
}
- /** {@inheritDoc} */
- @Override public int getTotalReceivedMessages() {
+ /**
+ * Gets total received messages count.
+ *
+ * @return Total received messages count.
+ */
+ public int getTotalReceivedMessages() {
return stats.totalReceivedMessages();
}
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getReceivedMessages() {
+ /**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Integer> getReceivedMessages() {
return stats.receivedMessages();
}
- /** {@inheritDoc} */
- @Override public int getTotalProcessedMessages() {
+ /**
+ * Gets total processed messages count.
+ *
+ * @return Total processed messages count.
+ */
+ public int getTotalProcessedMessages() {
return stats.totalProcessedMessages();
}
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getProcessedMessages() {
+ /**
+ * Gets processed messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Integer> getProcessedMessages() {
return stats.processedMessages();
}
- /** {@inheritDoc} */
- @Override public long getCoordinatorSinceTimestamp() {
+ /**
+ * Gets time local node has been coordinator since.
+ *
+ * @return Time local node is coordinator since.
+ */
+ public long getCoordinatorSinceTimestamp() {
return stats.coordinatorSinceTimestamp();
}
@@ -1815,7 +1946,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
if (netTimeout < 3000)
U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
- registerMBean(igniteInstanceName, this, TcpDiscoverySpiMBean.class);
+ registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this), TcpDiscoverySpiMBean.class);
if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
@@ -2000,6 +2131,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/** {@inheritDoc} */
+ @Override public TcpDiscoverySpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoverySpi.class, this);
}
@@ -2070,4 +2208,175 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return S.toString(SocketTimeoutObject.class, this);
}
}
+
+ /**
+ * MBean implementation for TcpDiscoverySpiMBean.
+ */
+ private class TcpDiscoverySpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpDiscoverySpiMBean {
+ /** {@inheritDoc} */
+ TcpDiscoverySpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSpiState() {
+ return impl.getSpiState();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ return impl.getMessageWorkerQueueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public UUID getCoordinator() {
+ return impl.getCoordinator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpDebugInfo() {
+ impl.dumpDebugInfo(log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSocketTimeout() {
+ return TcpDiscoverySpi.this.getSocketTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxAckTimeout() {
+ return TcpDiscoverySpi.this.getMaxAckTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAckTimeout() {
+ return TcpDiscoverySpi.this.getAckTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNetworkTimeout() {
+ return TcpDiscoverySpi.this.getNetworkTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getJoinTimeout() {
+ return TcpDiscoverySpi.this.getJoinTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalPort() {
+ return TcpDiscoverySpi.this.getLocalPort();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalPortRange() {
+ return TcpDiscoverySpi.this.getLocalPortRange();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getIpFinderCleanFrequency() {
+ return TcpDiscoverySpi.this.getIpFinderCleanFrequency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getThreadPriority() {
+ return TcpDiscoverySpi.this.getThreadPriority();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getHeartbeatFrequency() {
+ return TcpDiscoverySpi.this.getHeartbeatFrequency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaxMissedHeartbeats() {
+ return TcpDiscoverySpi.this.getMaxMissedHeartbeats();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaxMissedClientHeartbeats() {
+ return TcpDiscoverySpi.this.getMaxMissedClientHeartbeats();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getStatisticsPrintFrequency() {
+ return TcpDiscoverySpi.this.getStatisticsPrintFrequency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getIpFinderFormatted() {
+ return TcpDiscoverySpi.this.getIpFinderFormatted();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getReconnectCount() {
+ return TcpDiscoverySpi.this.getReconnectCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClientMode() {
+ return TcpDiscoverySpi.this.isClientMode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesJoined() {
+ return TcpDiscoverySpi.this.getNodesJoined();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesLeft() {
+ return TcpDiscoverySpi.this.getNodesLeft();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesFailed() {
+ return TcpDiscoverySpi.this.getNodesFailed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPendingMessagesRegistered() {
+ return TcpDiscoverySpi.this.getPendingMessagesRegistered();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPendingMessagesDiscarded() {
+ return stats.pendingMessagesDiscarded();
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAvgMessageProcessingTime() {
+ return TcpDiscoverySpi.this.getAvgMessageProcessingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxMessageProcessingTime() {
+ return TcpDiscoverySpi.this.getMaxMessageProcessingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalReceivedMessages() {
+ return TcpDiscoverySpi.this.getTotalReceivedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Integer> getReceivedMessages() {
+ return TcpDiscoverySpi.this.getReceivedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalProcessedMessages() {
+ return TcpDiscoverySpi.this.getTotalProcessedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Integer> getProcessedMessages() {
+ return TcpDiscoverySpi.this.getProcessedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCoordinatorSinceTimestamp() {
+ return TcpDiscoverySpi.this.getCoordinatorSinceTimestamp();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
index 0e0aed5..1cd91f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
@@ -71,10 +71,13 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
* with IP finder will be seen by IP finders on all other nodes.
*
* @param shared {@code true} if this IP finder is shared.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setShared(boolean shared) {
+ public TcpDiscoveryIpFinderAdapter setShared(boolean shared) {
this.shared = shared;
+
+ return this;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java
index a16f238..fbbda07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java
@@ -263,10 +263,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter {
* Data source should be fully configured and ready-to-use.
*
* @param dataSrc Data source.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
- public void setDataSource(DataSource dataSrc) {
+ public TcpDiscoveryJdbcIpFinder setDataSource(DataSource dataSrc) {
this.dataSrc = dataSrc;
+
+ return this;
}
/**
@@ -275,10 +278,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter {
*
* @param initSchema {@code True} if DB schema should be initialized by Ignite (default behaviour),
* {code @false} if schema was explicitly created by user.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setInitSchema(boolean initSchema) {
+ public TcpDiscoveryJdbcIpFinder setInitSchema(boolean initSchema) {
this.initSchema = initSchema;
+
+ return this;
}
/**
@@ -404,6 +410,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/** {@inheritDoc} */
+ @Override public TcpDiscoveryJdbcIpFinder setShared(boolean shared) {
+ super.setShared(shared);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryJdbcIpFinder.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 8fe8a65..6c47014 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -153,10 +153,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* If not provided, default value is {@link #DFLT_MCAST_GROUP}.
*
* @param mcastGrp Multicast IP address.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMulticastGroup(String mcastGrp) {
+ public TcpDiscoveryMulticastIpFinder setMulticastGroup(String mcastGrp) {
this.mcastGrp = mcastGrp;
+
+ return this;
}
/**
@@ -174,10 +177,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* If not provided, default value is {@link #DFLT_MCAST_PORT}.
*
* @param mcastPort Multicast port number.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMulticastPort(int mcastPort) {
+ public TcpDiscoveryMulticastIpFinder setMulticastPort(int mcastPort) {
this.mcastPort = mcastPort;
+
+ return this;
}
/**
@@ -196,10 +202,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* If not provided, default value is {@link #DFLT_RES_WAIT_TIME}.
*
* @param resWaitTime Time IP finder waits for reply to multicast address request.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setResponseWaitTime(int resWaitTime) {
+ public TcpDiscoveryMulticastIpFinder setResponseWaitTime(int resWaitTime) {
this.resWaitTime = resWaitTime;
+
+ return this;
}
/**
@@ -219,10 +228,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}.
*
* @param addrReqAttempts Number of attempts to send multicast address request.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setAddressRequestAttempts(int addrReqAttempts) {
+ public TcpDiscoveryMulticastIpFinder setAddressRequestAttempts(int addrReqAttempts) {
this.addrReqAttempts = addrReqAttempts;
+
+ return this;
}
/**
@@ -245,10 +257,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
*
* @param locAddr Local host address.
* @see org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String)
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalAddress(String locAddr) {
+ public TcpDiscoveryMulticastIpFinder setLocalAddress(String locAddr) {
this.locAddr = locAddr;
+
+ return this;
}
/**
@@ -272,10 +287,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* Default value is {@code -1} which corresponds to system default value.
*
* @param ttl Time to live.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setTimeToLive(int ttl) {
+ public TcpDiscoveryMulticastIpFinder setTimeToLive(int ttl) {
this.ttl = ttl;
+
+ return this;
}
/**
@@ -650,11 +668,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString());
- }
-
/**
* @param e Network error to handle.
* @return {@code True} if this error is recoverable and the operation can be retried.
@@ -670,6 +683,18 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
return true;
}
+ /** {@inheritDoc} */
+ @Override public TcpDiscoveryMulticastIpFinder setShared(boolean shared) {
+ super.setShared(shared);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString());
+ }
+
/**
* Response to multicast address request.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java
index d4e93d2..a30309c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java
@@ -112,10 +112,13 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter {
* Sets path.
*
* @param path Shared path.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setPath(String path) {
+ public TcpDiscoverySharedFsIpFinder setPath(String path) {
this.path = path;
+
+ return this;
}
/**
@@ -302,6 +305,13 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/** {@inheritDoc} */
+ @Override public TcpDiscoverySharedFsIpFinder setShared(boolean shared) {
+ super.setShared(shared);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoverySharedFsIpFinder.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java
index 94c237f..e2239b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java
@@ -129,11 +129,12 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter {
*
* @param addrs Known nodes addresses.
* @throws IgniteSpiException If any error occurs.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public synchronized void setAddresses(Collection<String> addrs) throws IgniteSpiException {
+ public synchronized TcpDiscoveryVmIpFinder setAddresses(Collection<String> addrs) throws IgniteSpiException {
if (F.isEmpty(addrs))
- return;
+ return this;
Collection<InetSocketAddress> newAddrs = new LinkedHashSet<>();
@@ -141,6 +142,8 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter {
newAddrs.addAll(address(ipStr));
this.addrs = newAddrs;
+
+ return this;
}
/**
@@ -261,6 +264,13 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/** {@inheritDoc} */
+ @Override public TcpDiscoveryVmIpFinder setShared(boolean shared) {
+ super.setShared(shared);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryVmIpFinder.class, this, "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java
index dcfbde1..a61c236 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java
@@ -29,6 +29,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.eventstorage.EventStorageSpi;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -94,8 +95,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
* @see org.apache.ignite.spi.eventstorage.EventStorageSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
-public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStorageSpi,
- MemoryEventStorageSpiMBean {
+public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStorageSpi {
/** Default event time to live value in milliseconds (value is {@link Long#MAX_VALUE}). */
public static final long DFLT_EXPIRE_AGE_MS = Long.MAX_VALUE;
@@ -131,10 +131,13 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor
* Sets filter for events to be recorded.
*
* @param filter Filter to use.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setFilter(IgnitePredicate<Event> filter) {
+ public MemoryEventStorageSpi setFilter(IgnitePredicate<Event> filter) {
this.filter = filter;
+
+ return this;
}
/** {@inheritDoc} */
@@ -151,7 +154,7 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor
log.debug(configInfo("expireCnt", expireCnt));
}
- registerMBean(igniteInstanceName, this, MemoryEventStorageSpiMBean.class);
+ registerMBean(igniteInstanceName, new MemoryEventStorageSpiMBeanImpl(this), MemoryEventStorageSpiMBean.class);
// Ack ok start.
if (log.isDebugEnabled())
@@ -171,16 +174,37 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor
}
/**
+ * See {@link #setExpireAgeMs(long)}
+ *
+ * @return Event time-to-live.
+ */
+ public long getExpireAgeMs() {
+ return expireAgeMs;
+ }
+
+ /**
* Sets events expiration time. All events that exceed this value
* will be removed from the queue when next event comes.
* <p>
* If not provided, default value is {@link #DFLT_EXPIRE_AGE_MS}.
*
* @param expireAgeMs Expiration time in milliseconds.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setExpireAgeMs(long expireAgeMs) {
+ public MemoryEventStorageSpi setExpireAgeMs(long expireAgeMs) {
this.expireAgeMs = expireAgeMs;
+
+ return this;
+ }
+
+ /**
+ * See {@link #setExpireCount(long)}
+ *
+ * @return Maximum event queue size.
+ */
+ public long getExpireCount() {
+ return expireCnt;
}
/**
@@ -189,29 +213,28 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor
* If not provided, default value {@link #DFLT_EXPIRE_COUNT} will be used.
*
* @param expireCnt Maximum queue size.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setExpireCount(long expireCnt) {
+ public MemoryEventStorageSpi setExpireCount(long expireCnt) {
this.expireCnt = expireCnt;
- }
- /** {@inheritDoc} */
- @Override public long getExpireAgeMs() {
- return expireAgeMs;
+ return this;
}
- /** {@inheritDoc} */
- @Override public long getExpireCount() {
- return expireCnt;
- }
-
- /** {@inheritDoc} */
- @Override public long getQueueSize() {
+ /**
+ * Gets current queue size of the event queue.
+ *
+ * @return Current queue size of the event queue.
+ */
+ public long getQueueSize() {
return evts.sizex();
}
- /** {@inheritDoc} */
- @Override public void clearAll() {
+ /**
+ * Removes all events from the event queue.
+ */
+ public void clearAll() {
evts.clear();
}
@@ -278,7 +301,44 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor
}
/** {@inheritDoc} */
+ @Override public MemoryEventStorageSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(MemoryEventStorageSpi.class, this);
}
+
+ /**
+ * MBean implementation for MemoryEventStorageSpi.
+ */
+ private class MemoryEventStorageSpiMBeanImpl extends IgniteSpiMBeanAdapter implements MemoryEventStorageSpiMBean {
+ /** {@inheritDoc} */
+ MemoryEventStorageSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getExpireAgeMs() {
+ return MemoryEventStorageSpi.this.getExpireAgeMs();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getExpireCount() {
+ return MemoryEventStorageSpi.this.getExpireCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getQueueSize() {
+ return MemoryEventStorageSpi.this.getQueueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearAll() {
+ MemoryEventStorageSpi.this.clearAll();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index 4b916e7..468a627 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.failover.FailoverContext;
import org.apache.ignite.spi.failover.FailoverSpi;
@@ -95,7 +96,7 @@ import org.apache.ignite.spi.failover.FailoverSpi;
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = true)
-public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, AlwaysFailoverSpiMBean {
+public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi {
/** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */
public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5;
@@ -124,8 +125,12 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
/** Number of jobs that were failed over. */
private int totalFailoverJobs;
- /** {@inheritDoc} */
- @Override public int getMaximumFailoverAttempts() {
+ /**
+ * See {@link #setMaximumFailoverAttempts(int)}.
+ *
+ * @return Maximum number of attempts to execute a failed job on another node.
+ */
+ public int getMaximumFailoverAttempts() {
return maxFailoverAttempts;
}
@@ -134,14 +139,21 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
* If not specified, {@link #DFLT_MAX_FAILOVER_ATTEMPTS} value will be used.
*
* @param maxFailoverAttempts Maximum number of attempts to execute a failed job on another node.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaximumFailoverAttempts(int maxFailoverAttempts) {
+ public AlwaysFailoverSpi setMaximumFailoverAttempts(int maxFailoverAttempts) {
this.maxFailoverAttempts = maxFailoverAttempts;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getTotalFailoverJobsCount() {
+ /**
+ * Get total number of jobs that were failed over.
+ *
+ * @return Total number of failed over jobs.
+ */
+ public int getTotalFailoverJobsCount() {
return totalFailoverJobs;
}
@@ -160,7 +172,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
if (log.isDebugEnabled())
log.debug(configInfo("maximumFailoverAttempts", maxFailoverAttempts));
- registerMBean(igniteInstanceName, this, AlwaysFailoverSpiMBean.class);
+ registerMBean(igniteInstanceName, new AlwaysFailoverSpiMBeanImpl(this), AlwaysFailoverSpiMBean.class);
// Ack ok start.
if (log.isDebugEnabled())
@@ -286,7 +298,34 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
}
/** {@inheritDoc} */
+ @Override public AlwaysFailoverSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(AlwaysFailoverSpi.class, this);
}
+
+ /**
+ * MBean implementation for AlwaysFailoverSpi.
+ */
+ private class AlwaysFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements AlwaysFailoverSpiMBean {
+ /** {@inheritDoc} */
+ AlwaysFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaximumFailoverAttempts() {
+ return AlwaysFailoverSpi.this.getMaximumFailoverAttempts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalFailoverJobsCount() {
+ return AlwaysFailoverSpi.this.getTotalFailoverJobsCount();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java
index 05c681d..3ef32ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java
@@ -34,6 +34,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.failover.FailoverContext;
import org.apache.ignite.spi.failover.FailoverSpi;
@@ -98,8 +99,7 @@ import static org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSp
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = true)
-public class JobStealingFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
- JobStealingFailoverSpiMBean {
+public class JobStealingFailoverSpi extends IgniteSpiAdapter implements FailoverSpi {
/** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */
public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5;
@@ -136,8 +136,12 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover
/** Number of jobs that were stolen. */
private int totalStolenJobs;
- /** {@inheritDoc} */
- @Override public int getMaximumFailoverAttempts() {
+ /**
+ * See {@link #setMaximumFailoverAttempts(int)}.
+ *
+ * @return Maximum number of attempts to execute a failed job on another node.
+ */
+ public int getMaximumFailoverAttempts() {
return maxFailoverAttempts;
}
@@ -151,19 +155,30 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover
*
* @param maxFailoverAttempts Maximum number of attempts to execute a failed
* job on another node.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaximumFailoverAttempts(int maxFailoverAttempts) {
+ public JobStealingFailoverSpi setMaximumFailoverAttempts(int maxFailoverAttempts) {
this.maxFailoverAttempts = maxFailoverAttempts;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getTotalFailedOverJobsCount() {
+ /**
+ * Get total number of jobs that were failed over including stolen ones.
+ *
+ * @return Total number of failed over jobs.
+ */
+ public int getTotalFailedOverJobsCount() {
return totalFailedOverJobs;
}
- /** {@inheritDoc} */
- @Override public int getTotalStolenJobsCount() {
+ /**
+ * Get total number of jobs that were stolen.
+ *
+ * @return Total number of stolen jobs.
+ */
+ public int getTotalStolenJobsCount() {
return totalStolenJobs;
}
@@ -182,7 +197,7 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover
if (log.isDebugEnabled())
log.debug(configInfo("maxFailoverAttempts", maxFailoverAttempts));
- registerMBean(igniteInstanceName, this, JobStealingFailoverSpiMBean.class);
+ registerMBean(igniteInstanceName, new JobStealingFailoverSpiMBeanImpl(this), JobStealingFailoverSpiMBean.class);
// Ack ok start.
if (log.isDebugEnabled())
@@ -355,7 +370,40 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover
}
/** {@inheritDoc} */
+ @Override public JobStealingFailoverSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(JobStealingFailoverSpi.class, this);
}
+
+ /**
+ * MBean implementation for JobStealingFailoverSpi.
+ */
+ private class JobStealingFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements JobStealingFailoverSpiMBean {
+ /** {@inheritDoc} */
+ public JobStealingFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaximumFailoverAttempts() {
+ return JobStealingFailoverSpi.this.getMaximumFailoverAttempts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalFailedOverJobsCount() {
+ return JobStealingFailoverSpi.this.getTotalFailedOverJobsCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalStolenJobsCount() {
+ return JobStealingFailoverSpi.this.getTotalStolenJobsCount();
+ }
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java
index 1056d2e..ffd695e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.failover.FailoverContext;
import org.apache.ignite.spi.failover.FailoverSpi;
@@ -32,7 +33,8 @@ import org.apache.ignite.spi.failover.FailoverSpi;
/**
* This class provides failover SPI implementation that never fails over. This implementation
* never fails over a failed job by always returning {@code null} out of
- * {@link org.apache.ignite.spi.failover.FailoverSpi#failover(org.apache.ignite.spi.failover.FailoverContext, List)} method.
+ * {@link org.apache.ignite.spi.failover.FailoverSpi#failover(org.apache.ignite.spi.failover.FailoverContext, List)}
+ * method.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* This SPI has no mandatory configuration parameters.
@@ -54,17 +56,18 @@ import org.apache.ignite.spi.failover.FailoverSpi;
* Here is an example on how to configure grid with {@link NeverFailoverSpi} from Spring XML configuration file:
* <pre name="code" class="xml">
* <property name="failoverSpi">
- * <bean class="org.apache.ignite.spi.failover.never.NeverFailoverSpi"/>
+ * <bean class="org.apache.ignite.spi.failover.never.NeverFailoverSpi"/>
* </property>
* </pre>
* <p>
* <img src="http://ignite.apache.org/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
+ *
* @see org.apache.ignite.spi.failover.FailoverSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
-public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, NeverFailoverSpiMBean {
+public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi {
/** Injected grid logger. */
@LoggerResource
private IgniteLogger log;
@@ -74,7 +77,7 @@ public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, N
// Start SPI start stopwatch.
startStopwatch();
- registerMBean(igniteInstanceName, this, NeverFailoverSpiMBean.class);
+ registerMBean(igniteInstanceName, new NeverFailoverSpiMBeanImpl(this), NeverFailoverSpiMBean.class);
// Ack ok start.
if (log.isDebugEnabled())
@@ -93,14 +96,31 @@ public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, N
/** {@inheritDoc} */
@Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) {
U.warn(log, "Returning 'null' node for failed job (failover will not happen) [job=" +
- ctx.getJobResult().getJob() + ", task=" + ctx.getTaskSession().getTaskName() +
+ ctx.getJobResult().getJob() + ", task=" + ctx.getTaskSession().getTaskName() +
", sessionId=" + ctx.getTaskSession().getId() + ']');
return null;
}
/** {@inheritDoc} */
+ @Override public NeverFailoverSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(NeverFailoverSpi.class, this);
}
+
+ /**
+ * MBean implementation for NeverFailoverSpi.
+ */
+ private class NeverFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements NeverFailoverSpiMBean {
+ /** {@inheritDoc} */
+ NeverFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java
index 5c8bfd2..a8683a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java
@@ -68,4 +68,11 @@ public class NoopIndexingSpi extends IgniteSpiAdapter implements IndexingSpi {
@Override public void spiStop() throws IgniteSpiException {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public NoopIndexingSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
}
\ No newline at end of file