You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/06 13:31:11 UTC
[04/50] [abbrv] ignite git commit: IGNITE-4564: All setters on public
configuration now return "this" instance to allow convenient chaining. This
closes #1449.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java
index 619c468..35cb62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/cache/CacheCheckpointSpi.java
@@ -30,6 +30,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.checkpoint.CheckpointListener;
import org.apache.ignite.spi.checkpoint.CheckpointSpi;
@@ -101,7 +102,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
* @see org.apache.ignite.spi.checkpoint.CheckpointSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
-public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, CacheCheckpointSpiMBean {
+public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi {
/** Default cache name (value is <tt>checkpoints</tt>). */
public static final String DFLT_CACHE_NAME = "checkpoints";
@@ -124,14 +125,21 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp
* If cache name is not provided {@link #DFLT_CACHE_NAME} is used.
*
* @param cacheName Cache name.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setCacheName(String cacheName) {
+ public CacheCheckpointSpi setCacheName(String cacheName) {
this.cacheName = cacheName;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public String getCacheName() {
+ /**
+ * Gets cache name to be used by this SPI..
+ *
+ * @return Cache name to be used by this SPI.
+ */
+ public String getCacheName() {
return cacheName;
}
@@ -146,7 +154,7 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp
if (log.isDebugEnabled())
log.debug(configInfo("cacheName", cacheName));
- registerMBean(igniteInstanceName, this, CacheCheckpointSpiMBean.class);
+ registerMBean(igniteInstanceName, new CacheCheckpointSpiMBeanImpl(this), CacheCheckpointSpiMBean.class);
if (log.isDebugEnabled())
log.debug(startInfo());
@@ -247,7 +255,29 @@ public class CacheCheckpointSpi extends IgniteSpiAdapter implements CheckpointSp
}
/** {@inheritDoc} */
+ @Override public CacheCheckpointSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheCheckpointSpi.class, this);
}
+
+ /**
+ * MBean implementation for CacheCheckpointSpi.
+ */
+ private class CacheCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements CacheCheckpointSpiMBean {
+ /** {@inheritDoc} */
+ CacheCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCacheName() {
+ return CacheCheckpointSpi.this.getCacheName();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java
index a052704..744ce59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/jdbc/JdbcCheckpointSpi.java
@@ -32,6 +32,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.checkpoint.CheckpointListener;
import org.apache.ignite.spi.checkpoint.CheckpointSpi;
@@ -111,7 +112,7 @@ import org.apache.ignite.spi.checkpoint.CheckpointSpi;
*/
@SuppressWarnings({"JDBCResourceOpenedButNotSafelyClosed", "JDBCExecuteWithNonConstantString"})
@IgniteSpiMultipleInstancesSupport(true)
-public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, JdbcCheckpointSpiMBean {
+public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi {
/** Default number of retries in case of errors (value is {@code 2}). */
public static final int DFLT_NUMBER_OF_RETRIES = 2;
@@ -242,58 +243,102 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
/** Listener. */
private CheckpointListener lsnr;
- /** {@inheritDoc} */
- @Override public int getNumberOfRetries() {
+ /**
+ * Gets number of retries in case of DB failure.
+ *
+ * @return Number of retries.
+ */
+ public int getNumberOfRetries() {
return retryNum;
}
- /** {@inheritDoc} */
- @Override public String getDataSourceInfo() {
+ /**
+ * Gets data source description.
+ *
+ * @return Description for data source.
+ */
+ public String getDataSourceInfo() {
return dataSrc.toString();
}
- /** {@inheritDoc} */
- @Override public String getUser() {
+ /**
+ * Gets checkpoint jdbc user name.
+ *
+ * @return User name for checkpoint jdbc.
+ */
+ public String getUser() {
return user;
}
- /** {@inheritDoc} */
- @Override public String getPwd() {
+ /**
+ * Gets checkpoint jdbc password.
+ *
+ * @return Password for checkpoint jdbc.
+ */
+ public String getPwd() {
return pwd;
}
- /** {@inheritDoc} */
- @Override public String getCheckpointTableName() {
+ /**
+ * Gets checkpoint table name.
+ *
+ * @return Checkpoint table name.
+ */
+ public String getCheckpointTableName() {
return tblName;
}
- /** {@inheritDoc} */
- @Override public String getKeyFieldName() {
+ /**
+ * Gets key field name for checkpoint table.
+ *
+ * @return Key field name for checkpoint table.
+ */
+ public String getKeyFieldName() {
return keyName;
}
- /** {@inheritDoc} */
- @Override public String getKeyFieldType() {
+ /**
+ * Gets key field type for checkpoint table.
+ *
+ * @return Key field type for checkpoint table.
+ */
+ public String getKeyFieldType() {
return keyType;
}
- /** {@inheritDoc} */
- @Override public String getValueFieldName() {
+ /**
+ * Gets value field name for checkpoint table.
+ *
+ * @return Value field name for checkpoint table.
+ */
+ public String getValueFieldName() {
return valName;
}
- /** {@inheritDoc} */
- @Override public String getValueFieldType() {
+ /**
+ * Gets value field type for checkpoint table.
+ *
+ * @return Value field type for checkpoint table.
+ */
+ public String getValueFieldType() {
return valType;
}
- /** {@inheritDoc} */
- @Override public String getExpireDateFieldName() {
+ /**
+ * Gets expiration date field name for checkpoint table.
+ *
+ * @return Create date field name for checkpoint table.
+ */
+ public String getExpireDateFieldName() {
return expDateName;
}
- /** {@inheritDoc} */
- @Override public String getExpireDateFieldType() {
+ /**
+ * Gets expiration date field type for checkpoint table.
+ *
+ * @return Expiration date field type for checkpoint table.
+ */
+ public String getExpireDateFieldType() {
return expDateType;
}
@@ -306,10 +351,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* this SPI from Spring configuration file. Refer to {@code Apache DBCP} project for more information.
*
* @param dataSrc DataSource object to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
- public void setDataSource(DataSource dataSrc) {
+ public JdbcCheckpointSpi setDataSource(DataSource dataSrc) {
this.dataSrc = dataSrc;
+
+ return this;
}
/**
@@ -317,10 +365,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* the value is {@link #DFLT_NUMBER_OF_RETRIES}.
*
* @param retryNum Number of retries in case of any database errors.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setNumberOfRetries(int retryNum) {
+ public JdbcCheckpointSpi setNumberOfRetries(int retryNum) {
this.retryNum = retryNum;
+
+ return this;
}
/**
@@ -329,10 +380,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
*
* @param user Checkpoint database user name to set.
* @see #setPwd(String)
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setUser(String user) {
+ public JdbcCheckpointSpi setUser(String user) {
this.user = user;
+
+ return this;
}
/**
@@ -341,20 +395,26 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
*
* @param pwd Checkpoint database password to set.
* @see #setUser(String)
+ ** @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setPwd(String pwd) {
+ public JdbcCheckpointSpi setPwd(String pwd) {
this.pwd = pwd;
+
+ return this;
}
/**
* Sets checkpoint table name. By default {@link #DFLT_CHECKPOINT_TABLE_NAME} is used.
*
* @param tblName Checkpoint table name to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setCheckpointTableName(String tblName) {
+ public JdbcCheckpointSpi setCheckpointTableName(String tblName) {
this.tblName = tblName;
+
+ return this;
}
/**
@@ -363,10 +423,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* change key field type (see {@link #setKeyFieldType(String)}).
*
* @param keyName Checkpoint key field name to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setKeyFieldName(String keyName) {
+ public JdbcCheckpointSpi setKeyFieldName(String keyName) {
this.keyName = keyName;
+
+ return this;
}
/**
@@ -375,10 +438,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* By default {@link #DFLT_EXPIRE_DATE_FIELD_TYPE} is used.
*
* @param keyType Checkpoint key field type to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setKeyFieldType(String keyType) {
+ public JdbcCheckpointSpi setKeyFieldType(String keyType) {
this.keyType = keyType;
+
+ return this;
}
/**
@@ -387,10 +453,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* (see {@link #setValueFieldType(String)}).
*
* @param valName Checkpoint value field name to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setValueFieldName(String valName) {
+ public JdbcCheckpointSpi setValueFieldName(String valName) {
this.valName = valName;
+
+ return this;
}
/**
@@ -400,10 +469,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* then the type should be {@code longvarbinary}.
*
* @param valType Checkpoint value field type to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setValueFieldType(String valType) {
+ public JdbcCheckpointSpi setValueFieldType(String valType) {
this.valType = valType;
+
+ return this;
}
/**
@@ -413,10 +485,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* (see {@link #setExpireDateFieldType(String)}).
*
* @param expDateName Checkpoint expiration date field name to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setExpireDateFieldName(String expDateName) {
+ public JdbcCheckpointSpi setExpireDateFieldName(String expDateName) {
this.expDateName = expDateName;
+
+ return this;
}
/**
@@ -425,10 +500,13 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
* corresponding SQL {@code DATETIME} type.
*
* @param expDateType Checkpoint expiration date field type to set.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setExpireDateFieldType(String expDateType) {
+ public JdbcCheckpointSpi setExpireDateFieldType(String expDateType) {
this.expDateType = expDateType;
+
+ return this;
}
/**
@@ -880,4 +958,77 @@ public class JdbcCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
@Override public void setCheckpointListener(CheckpointListener lsnr) {
this.lsnr = lsnr;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public JdbcCheckpointSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /**
+ * MBean implementation for JdbcCheckpointSpi.
+ */
+ private class JdbcCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements JdbcCheckpointSpiMBean {
+ /** {@inheritDoc} */
+ JdbcCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getNumberOfRetries() {
+ return JdbcCheckpointSpi.this.getNumberOfRetries();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDataSourceInfo() {
+ return JdbcCheckpointSpi.this.getDataSourceInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getUser() {
+ return JdbcCheckpointSpi.this.getUser();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getPwd() {
+ return JdbcCheckpointSpi.this.getPwd();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCheckpointTableName() {
+ return JdbcCheckpointSpi.this.getCheckpointTableName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getKeyFieldName() {
+ return JdbcCheckpointSpi.this.getKeyFieldName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getKeyFieldType() {
+ return JdbcCheckpointSpi.this.getKeyFieldType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getValueFieldName() {
+ return JdbcCheckpointSpi.this.getValueFieldName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getValueFieldType() {
+ return JdbcCheckpointSpi.this.getValueFieldType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getExpireDateFieldName() {
+ return JdbcCheckpointSpi.this.getExpireDateFieldName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getExpireDateFieldType() {
+ return JdbcCheckpointSpi.this.getExpireDateFieldType();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
index c3ac202..d0bf2d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
@@ -71,6 +71,13 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
}
/** {@inheritDoc} */
+ @Override public NoopCheckpointSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(NoopCheckpointSpi.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
index 1917d38..29a7ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
@@ -42,6 +42,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.checkpoint.CheckpointListener;
import org.apache.ignite.spi.checkpoint.CheckpointSpi;
@@ -118,8 +119,7 @@ import org.jetbrains.annotations.Nullable;
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
-public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi,
- SharedFsCheckpointSpiMBean {
+public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi {
/**
* Default checkpoint directory. Note that this path is relative to {@code IGNITE_HOME/work} folder
* if {@code IGNITE_HOME} system or environment variable specified, otherwise it is relative to
@@ -177,13 +177,21 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
dirPaths.offer(DFLT_DIR_PATH);
}
- /** {@inheritDoc} */
- @Override public Collection<String> getDirectoryPaths() {
+ /**
+ * Gets collection of all configured paths where checkpoints can be saved.
+ *
+ * @return Collection of all configured paths.
+ */
+ public Collection<String> getDirectoryPaths() {
return dirPaths;
}
- /** {@inheritDoc} */
- @Override public String getCurrentDirectoryPath() {
+ /**
+ * Gets path to the directory where all checkpoints are saved.
+ *
+ * @return Path to the checkpoints directory.
+ */
+ public String getCurrentDirectoryPath() {
return curDirPath;
}
@@ -196,13 +204,16 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
*
* @param dirPaths Absolute or Ignite installation home folder relative path where checkpoints
* will be stored.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setDirectoryPaths(Collection<String> dirPaths) {
+ public SharedFsCheckpointSpi setDirectoryPaths(Collection<String> dirPaths) {
A.ensure(!F.isEmpty(dirPaths), "!F.isEmpty(dirPaths)");
this.dirPaths.clear();
this.dirPaths.addAll(dirPaths);
+
+ return this;
}
/** {@inheritDoc} */
@@ -227,7 +238,7 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
if (!folder.isDirectory())
throw new IgniteSpiException("Checkpoint directory path is not a valid directory: " + curDirPath);
- registerMBean(igniteInstanceName, this, SharedFsCheckpointSpiMBean.class);
+ registerMBean(igniteInstanceName, new SharedFsCheckpointSpiMBeanImpl(this), SharedFsCheckpointSpiMBean.class);
// Ack parameters.
if (log.isDebugEnabled()) {
@@ -506,7 +517,34 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
}
/** {@inheritDoc} */
+ @Override public SharedFsCheckpointSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(SharedFsCheckpointSpi.class, this);
}
+
+ /**
+ * MBean implementation for SharedFsCheckpointSpi.
+ */
+ private class SharedFsCheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements SharedFsCheckpointSpiMBean {
+ /** {@inheritDoc} */
+ SharedFsCheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> getDirectoryPaths() {
+ return SharedFsCheckpointSpi.this.getDirectoryPaths();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCurrentDirectoryPath() {
+ return SharedFsCheckpointSpi.this.getCurrentDirectoryPath();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java
index d94b453..703e90e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpi.java
@@ -26,6 +26,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
@@ -78,8 +79,7 @@ import org.apache.ignite.spi.collision.CollisionSpi;
* </pre>
*/
@IgniteSpiMultipleInstancesSupport(true)
-public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi,
- FifoQueueCollisionSpiMBean {
+public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
/**
* Default number of parallel jobs allowed (set to number of cores times 2).
*/
@@ -110,49 +110,88 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision
/** Number of jobs that are held. */
private volatile int heldCnt;
- /** {@inheritDoc} */
- @Override public int getParallelJobsNumber() {
+ /**
+ * See {@link #setParallelJobsNumber(int)}
+ *
+ * @return Number of jobs that can be executed in parallel.
+ */
+ public int getParallelJobsNumber() {
return parallelJobsNum;
}
- /** {@inheritDoc} */
+ /**
+ * Sets number of jobs that can be executed in parallel.
+ *
+ * @param parallelJobsNum Parallel jobs number.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setParallelJobsNumber(int parallelJobsNum) {
+ public FifoQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum) {
A.ensure(parallelJobsNum > 0, "parallelJobsNum > 0");
this.parallelJobsNum = parallelJobsNum;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getWaitingJobsNumber() {
+ /**
+ * See {@link #setWaitingJobsNumber(int)}
+ *
+ * @return Maximum allowed number of waiting jobs.
+ */
+ public int getWaitingJobsNumber() {
return waitJobsNum;
}
- /** {@inheritDoc} */
+ /**
+ * Sets maximum number of jobs that are allowed to wait in waiting queue. If number
+ * of waiting jobs ever exceeds this number, excessive jobs will be rejected.
+ *
+ * @param waitJobsNum Waiting jobs number.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setWaitingJobsNumber(int waitJobsNum) {
+ public FifoQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum) {
A.ensure(waitJobsNum >= 0, "waitingJobsNum >= 0");
this.waitJobsNum = waitJobsNum;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getCurrentWaitJobsNumber() {
+ /**
+ * Gets current number of jobs that wait for the execution.
+ *
+ * @return Number of jobs that wait for execution.
+ */
+ public int getCurrentWaitJobsNumber() {
return waitingCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentActiveJobsNumber() {
+ /**
+ * Gets current number of jobs that are active, i.e. {@code 'running + held'} jobs.
+ *
+ * @return Number of active jobs.
+ */
+ public int getCurrentActiveJobsNumber() {
return runningCnt + heldCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentRunningJobsNumber() {
+ /**
+ * Gets number of currently running (not {@code 'held}) jobs.
+ *
+ * @return Number of currently running (not {@code 'held}) jobs.
+ */
+ public int getCurrentRunningJobsNumber() {
return runningCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentHeldJobsNumber() {
+ /**
+ * Gets number of currently {@code 'held'} jobs.
+ *
+ * @return Number of currently {@code 'held'} jobs.
+ */
+ public int getCurrentHeldJobsNumber() {
return heldCnt;
}
@@ -168,7 +207,7 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision
if (log.isDebugEnabled())
log.debug(configInfo("parallelJobsNum", parallelJobsNum));
- registerMBean(igniteInstanceName, this, FifoQueueCollisionSpiMBean.class);
+ registerMBean(igniteInstanceName, new FifoQueueCollisionSpiMBeanImpl(this), FifoQueueCollisionSpiMBean.class);
// Ack start.
if (log.isDebugEnabled())
@@ -252,7 +291,64 @@ public class FifoQueueCollisionSpi extends IgniteSpiAdapter implements Collision
}
/** {@inheritDoc} */
+ @Override public FifoQueueCollisionSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(FifoQueueCollisionSpi.class, this);
}
+
+ /**
+ * MBean implementation for FifoQueueCollisionSpi.
+ */
+ private class FifoQueueCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter implements FifoQueueCollisionSpiMBean {
+ /** {@inheritDoc} */
+ FifoQueueCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getParallelJobsNumber() {
+ return FifoQueueCollisionSpi.this.getParallelJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentWaitJobsNumber() {
+ return FifoQueueCollisionSpi.this.getCurrentWaitJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentActiveJobsNumber() {
+ return FifoQueueCollisionSpi.this.getCurrentActiveJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentRunningJobsNumber() {
+ return FifoQueueCollisionSpi.this.getCurrentRunningJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentHeldJobsNumber() {
+ return FifoQueueCollisionSpi.this.getCurrentHeldJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getWaitingJobsNumber() {
+ return FifoQueueCollisionSpi.this.getWaitingJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWaitingJobsNumber(int waitJobsNum) {
+ FifoQueueCollisionSpi.this.setWaitingJobsNumber(waitJobsNum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setParallelJobsNumber(int parallelJobsNum) {
+ FifoQueueCollisionSpi.this.setParallelJobsNumber(parallelJobsNum);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java
index 40d47a2..59283e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/fifoqueue/FifoQueueCollisionSpiMBean.java
@@ -75,7 +75,7 @@ public interface FifoQueueCollisionSpiMBean extends IgniteSpiManagementMBean {
@MXBeanDescription("Number of active jobs.")
public int getCurrentActiveJobsNumber();
- /*
+ /**
* Gets number of currently running (not {@code 'held}) jobs.
*
* @return Number of currently running (not {@code 'held}) jobs.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 37db103..8a02225 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -49,6 +49,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
@@ -185,8 +186,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = true)
-public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi,
- JobStealingCollisionSpiMBean {
+public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
/** Maximum number of attempts to steal job by another node (default is {@code 5}). */
public static final int DFLT_MAX_STEALING_ATTEMPTS = 5;
@@ -305,66 +305,133 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
/** */
private Comparator<CollisionJobContext> cmp;
- /** {@inheritDoc} */
+ /**
+ * Sets number of jobs that can be executed in parallel.
+ *
+ * @param activeJobsThreshold Number of jobs that can be executed in parallel.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setActiveJobsThreshold(int activeJobsThreshold) {
+ public JobStealingCollisionSpi setActiveJobsThreshold(int activeJobsThreshold) {
A.ensure(activeJobsThreshold >= 0, "activeJobsThreshold >= 0");
this.activeJobsThreshold = activeJobsThreshold;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getActiveJobsThreshold() {
+ /**
+ * See {@link #setActiveJobsThreshold(int)}.
+ *
+ * @return Number of jobs that can be executed in parallel.
+ */
+ public int getActiveJobsThreshold() {
return activeJobsThreshold;
}
- /** {@inheritDoc} */
+ /**
+ * Sets job count threshold at which this node will
+ * start stealing jobs from other nodes.
+ *
+ * @param waitJobsThreshold Job count threshold.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setWaitJobsThreshold(int waitJobsThreshold) {
+ public JobStealingCollisionSpi setWaitJobsThreshold(int waitJobsThreshold) {
A.ensure(waitJobsThreshold >= 0, "waitJobsThreshold >= 0");
this.waitJobsThreshold = waitJobsThreshold;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getWaitJobsThreshold() {
+ /**
+ * See {@link #setWaitJobsThreshold(int)}.
+ *
+ * @return Job count threshold.
+ */
+ public int getWaitJobsThreshold() {
return waitJobsThreshold;
}
- /** {@inheritDoc} */
+ /**
+ * Message expire time configuration parameter. If no response is received
+ * from a busy node to a job stealing message, then implementation will
+ * assume that message never got there, or that remote node does not have
+ * this node included into topology of any of the jobs it has.
+ *
+ * @param msgExpireTime Message expire time.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setMessageExpireTime(long msgExpireTime) {
+ public JobStealingCollisionSpi setMessageExpireTime(long msgExpireTime) {
A.ensure(msgExpireTime > 0, "messageExpireTime > 0");
this.msgExpireTime = msgExpireTime;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public long getMessageExpireTime() {
+ /**
+ * See {@link #setMessageExpireTime(long)}.
+ *
+ * @return Message expire time.
+ */
+ public long getMessageExpireTime() {
return msgExpireTime;
}
- /** {@inheritDoc} */
+ /**
+ * Gets flag indicating whether this node should attempt to steal jobs
+ * from other nodes. If {@code false}, then this node will steal allow
+ * jobs to be stolen from it, but won't attempt to steal any jobs from
+ * other nodes.
+ * <p>
+ * Default value is {@code true}.
+ *
+ * @param isStealingEnabled Flag indicating whether this node should attempt to steal jobs
+ * from other nodes.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setStealingEnabled(boolean isStealingEnabled) {
+ public JobStealingCollisionSpi setStealingEnabled(boolean isStealingEnabled) {
this.isStealingEnabled = isStealingEnabled;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public boolean isStealingEnabled() {
+ /**
+ * See {@link #setStealingEnabled(boolean)}.
+ *
+ * @return Flag indicating whether this node should attempt to steal jobs
+ * from other nodes.
+ */
+ public boolean isStealingEnabled() {
return isStealingEnabled;
}
- /** {@inheritDoc} */
+ /**
+ * Gets maximum number of attempts to steal job by another node.
+ * If not specified, {@link JobStealingCollisionSpi#DFLT_MAX_STEALING_ATTEMPTS}
+ * value will be used.
+ *
+ * @param maxStealingAttempts Maximum number of attempts to steal job by another node.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setMaximumStealingAttempts(int maxStealingAttempts) {
+ public JobStealingCollisionSpi setMaximumStealingAttempts(int maxStealingAttempts) {
A.ensure(maxStealingAttempts > 0, "maxStealingAttempts > 0");
this.maxStealingAttempts = maxStealingAttempts;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getMaximumStealingAttempts() {
+ /**
+ * See {@link #setMaximumStealingAttempts(int)}.
+ *
+ * @return Maximum number of attempts to steal job by another node.
+ */
+ public int getMaximumStealingAttempts() {
return maxStealingAttempts;
}
@@ -374,47 +441,80 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
* {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} methods).
*
* @param stealAttrs Node attributes to enable job stealing for.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setStealingAttributes(Map<String, ? extends Serializable> stealAttrs) {
+ public JobStealingCollisionSpi setStealingAttributes(Map<String, ? extends Serializable> stealAttrs) {
this.stealAttrs = stealAttrs;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public Map<String, ? extends Serializable> getStealingAttributes() {
+ /**
+ * {@link #setStealingAttributes(Map)}.
+ *
+ * @return Node attributes to enable job stealing for.
+ */
+ public Map<String, ? extends Serializable> getStealingAttributes() {
return stealAttrs;
}
- /** {@inheritDoc} */
- @Override public int getCurrentRunningJobsNumber() {
+ /**
+ * Gets number of currently running (not {@code 'held}) jobs.
+ *
+ * @return Number of currently running (not {@code 'held}) jobs.
+ */
+ public int getCurrentRunningJobsNumber() {
return runningNum;
}
- /** {@inheritDoc} */
- @Override public int getCurrentHeldJobsNumber() {
+ /**
+ * Gets number of currently {@code 'held'} jobs.
+ *
+ * @return Number of currently {@code 'held'} jobs.
+ */
+ public int getCurrentHeldJobsNumber() {
return heldNum;
}
- /** {@inheritDoc} */
- @Override public int getCurrentWaitJobsNumber() {
+ /**
+ * Gets current number of jobs that wait for the execution.
+ *
+ * @return Number of jobs that wait for execution.
+ */
+ public int getCurrentWaitJobsNumber() {
return waitingNum;
}
- /** {@inheritDoc} */
- @Override public int getCurrentActiveJobsNumber() {
+ /**
+ * Gets current number of jobs that are being executed.
+ *
+ * @return Number of active jobs.
+ */
+ public int getCurrentActiveJobsNumber() {
return runningNum + heldNum;
}
- /** {@inheritDoc} */
- @Override public int getTotalStolenJobsNumber() {
+ /**
+ * Gets total number of stolen jobs.
+ *
+ * @return Number of stolen jobs.
+ */
+ public int getTotalStolenJobsNumber() {
return totalStolenJobsNum.get();
}
- /** {@inheritDoc} */
- @Override public int getCurrentJobsToStealNumber() {
+ /**
+ * Gets current number of jobs to be stolen. This is outstanding
+ * requests number.
+ *
+ * @return Number of jobs to be stolen.
+ */
+ public int getCurrentJobsToStealNumber() {
return stealReqs.get();
}
+
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
HashMap<String, Object> res = new HashMap<>(4);
@@ -445,7 +545,8 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
log.debug(configInfo("maxStealingAttempts", maxStealingAttempts));
}
- registerMBean(igniteInstanceName, this, JobStealingCollisionSpiMBean.class);
+ registerMBean(igniteInstanceName, new JobStealingCollisionSpiMBeanImpl(this),
+ JobStealingCollisionSpiMBean.class);
// Ack start.
if (log.isDebugEnabled())
@@ -698,7 +799,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
// requested to be stolen. Note, that we use lose total steal request
// counter to prevent excessive iteration over nodes under load.
for (Iterator<Entry<UUID, MessageInfo>> iter = rcvMsgMap.entrySet().iterator();
- iter.hasNext() && stealReqs.get() > 0;) {
+ iter.hasNext() && stealReqs.get() > 0;) {
Entry<UUID, MessageInfo> entry = iter.next();
UUID nodeId = entry.getKey();
@@ -998,6 +1099,13 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
}
/** {@inheritDoc} */
+ @Override public JobStealingCollisionSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(JobStealingCollisionSpi.class, this);
}
@@ -1047,4 +1155,99 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
}
}
+ /**
+ * MBean implementation for JobStealingCollisionSpi.
+ */
+ private class JobStealingCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter
+ implements JobStealingCollisionSpiMBean {
+ /** {@inheritDoc} */
+ JobStealingCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, ? extends Serializable> getStealingAttributes() {
+ return JobStealingCollisionSpi.this.getStealingAttributes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentRunningJobsNumber() {
+ return JobStealingCollisionSpi.this.getCurrentRunningJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentHeldJobsNumber() {
+ return JobStealingCollisionSpi.this.getCurrentHeldJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentWaitJobsNumber() {
+ return JobStealingCollisionSpi.this.getCurrentWaitJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentActiveJobsNumber() {
+ return JobStealingCollisionSpi.this.getCurrentActiveJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalStolenJobsNumber() {
+ return JobStealingCollisionSpi.this.getTotalStolenJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentJobsToStealNumber() {
+ return JobStealingCollisionSpi.this.getCurrentJobsToStealNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setActiveJobsThreshold(int activeJobsThreshold) {
+ JobStealingCollisionSpi.this.setActiveJobsThreshold(activeJobsThreshold);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getActiveJobsThreshold() {
+ return JobStealingCollisionSpi.this.getActiveJobsThreshold();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWaitJobsThreshold(int waitJobsThreshold) {
+ JobStealingCollisionSpi.this.setWaitJobsThreshold(waitJobsThreshold);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getWaitJobsThreshold() {
+ return JobStealingCollisionSpi.this.getWaitJobsThreshold();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMessageExpireTime(long msgExpireTime) {
+ JobStealingCollisionSpi.this.setMessageExpireTime(msgExpireTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMessageExpireTime() {
+ return JobStealingCollisionSpi.this.getMessageExpireTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStealingEnabled(boolean isStealingEnabled) {
+ JobStealingCollisionSpi.this.setStealingEnabled(isStealingEnabled);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStealingEnabled() {
+ return JobStealingCollisionSpi.this.isStealingEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaximumStealingAttempts(int maxStealingAttempts) {
+ JobStealingCollisionSpi.this.setMaximumStealingAttempts(maxStealingAttempts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaximumStealingAttempts() {
+ return JobStealingCollisionSpi.this.getMaximumStealingAttempts();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java
index 9c49f70..8052936 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpiMBean.java
@@ -52,7 +52,7 @@ public interface JobStealingCollisionSpiMBean extends IgniteSpiManagementMBean {
@MXBeanDescription("Number of active jobs.")
public int getCurrentActiveJobsNumber();
- /*
+ /**
* Gets number of currently running (not {@code 'held}) jobs.
*
* @return Number of currently running (not {@code 'held}) jobs.
@@ -87,10 +87,10 @@ public interface JobStealingCollisionSpiMBean extends IgniteSpiManagementMBean {
/**
* Sets number of jobs that can be executed in parallel.
*
- * @param activeJobsTreshold Number of jobs that can be executed in parallel.
+ * @param activeJobsThreshold Number of jobs that can be executed in parallel.
*/
@MXBeanDescription("Number of jobs that can be executed in parallel.")
- public void setActiveJobsThreshold(int activeJobsTreshold);
+ public void setActiveJobsThreshold(int activeJobsThreshold);
/**
* Gets job count threshold at which this node will
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java
index 8b75220..67a47a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/noop/NoopCollisionSpi.java
@@ -59,6 +59,13 @@ public class NoopCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
}
/** {@inheritDoc} */
+ @Override public NoopCollisionSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(NoopCollisionSpi.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
index 9a6eb0e..47e81dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
@@ -174,8 +175,7 @@ import org.apache.ignite.spi.collision.CollisionSpi;
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = true)
-public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi,
- PriorityQueueCollisionSpiMBean {
+public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
/**
* Default number of parallel jobs allowed (set to number of cores times 2).
*/
@@ -245,49 +245,89 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
@LoggerResource
private IgniteLogger log;
- /** {@inheritDoc} */
- @Override public int getParallelJobsNumber() {
+ /**
+ * Gets number of jobs that can be executed in parallel.
+ *
+ * @return Number of jobs that can be executed in parallel.
+ */
+ public int getParallelJobsNumber() {
return parallelJobsNum;
}
- /** {@inheritDoc} */
+ /**
+ * Sets number of jobs that can be executed in parallel.
+ *
+ * @param parallelJobsNum Parallel jobs number.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setParallelJobsNumber(int parallelJobsNum) {
+ public PriorityQueueCollisionSpi setParallelJobsNumber(int parallelJobsNum) {
A.ensure(parallelJobsNum > 0, "parallelJobsNum > 0");
this.parallelJobsNum = parallelJobsNum;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getWaitingJobsNumber() {
+ /**
+ * Maximum number of jobs that are allowed to wait in waiting queue. If number
+ * of waiting jobs ever exceeds this number, excessive jobs will be rejected.
+ *
+ * @return Maximum allowed number of waiting jobs.
+ */
+ public int getWaitingJobsNumber() {
return waitJobsNum;
}
- /** {@inheritDoc} */
+ /**
+ * Maximum number of jobs that are allowed to wait in waiting queue. If number
+ * of waiting jobs ever exceeds this number, excessive jobs will be rejected.
+ *
+ * @param waitJobsNum Maximium jobs number.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setWaitingJobsNumber(int waitJobsNum) {
+ public PriorityQueueCollisionSpi setWaitingJobsNumber(int waitJobsNum) {
A.ensure(waitJobsNum >= 0, "waitJobsNum >= 0");
this.waitJobsNum = waitJobsNum;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getCurrentWaitJobsNumber() {
+ /**
+ * Gets current number of jobs that wait for the execution.
+ *
+ * @return Number of jobs that wait for execution.
+ */
+ public int getCurrentWaitJobsNumber() {
return waitingCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentActiveJobsNumber() {
+ /**
+ * Gets current number of jobs that are active, i.e. {@code 'running + held'} jobs.
+ *
+ * @return Number of active jobs.
+ */
+ public int getCurrentActiveJobsNumber() {
return runningCnt + heldCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentRunningJobsNumber() {
+ /*
+ * Gets number of currently running (not {@code 'held}) jobs.
+ *
+ * @return Number of currently running (not {@code 'held}) jobs.
+ */
+ public int getCurrentRunningJobsNumber() {
return runningCnt;
}
- /** {@inheritDoc} */
- @Override public int getCurrentHeldJobsNumber() {
+ /**
+ * Gets number of currently {@code 'held'} jobs.
+ *
+ * @return Number of currently {@code 'held'} jobs.
+ */
+ public int getCurrentHeldJobsNumber() {
return heldCnt;
}
@@ -298,10 +338,13 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
* If not provided, default value is {@code {@link #DFLT_PRIORITY_ATTRIBUTE_KEY}}.
*
* @param taskPriAttrKey Priority session attribute key.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setPriorityAttributeKey(String taskPriAttrKey) {
+ public PriorityQueueCollisionSpi setPriorityAttributeKey(String taskPriAttrKey) {
this.taskPriAttrKey = taskPriAttrKey;
+
+ return this;
}
/**
@@ -311,53 +354,102 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
* If not provided, default value is {@code {@link #DFLT_JOB_PRIORITY_ATTRIBUTE_KEY}}.
*
* @param jobPriAttrKey Job priority attribute key.
+ * @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
- public void setJobPriorityAttributeKey(String jobPriAttrKey) {
+ public PriorityQueueCollisionSpi setJobPriorityAttributeKey(String jobPriAttrKey) {
this.jobPriAttrKey = jobPriAttrKey;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public String getPriorityAttributeKey() {
+ /**
+ * Gets key name of task priority attribute.
+ *
+ * @return Key name of task priority attribute.
+ */
+ public String getPriorityAttributeKey() {
return taskPriAttrKey;
}
- /** {@inheritDoc} */
- @Override public String getJobPriorityAttributeKey() {
+ /**
+ * Gets key name of job priority attribute.
+ *
+ * @return Key name of job priority attribute.
+ */
+ public String getJobPriorityAttributeKey() {
return jobPriAttrKey;
}
- /** {@inheritDoc} */
- @Override public int getDefaultPriority() {
+ /**
+ * Gets default priority to use if a job does not have priority attribute
+ * set.
+ *
+ * @return Default priority to use if a task does not have priority
+ * attribute set.
+ */
+ public int getDefaultPriority() {
return dfltPri;
}
- /** {@inheritDoc} */
+ /**
+ * Sets default priority to use if a job does not have priority attribute set.
+ *
+ * @param priority default priority.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setDefaultPriority(int dfltPri) {
- this.dfltPri = dfltPri;
+ public PriorityQueueCollisionSpi setDefaultPriority(int priority) {
+ this.dfltPri = priority;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getStarvationIncrement() {
+ /**
+ * Gets value to increment job priority by every time a lower priority job gets
+ * behind a higher priority job.
+ *
+ * @return Value to increment job priority by every time a lower priority job gets
+ * behind a higher priority job.
+ */
+ public int getStarvationIncrement() {
return starvationInc;
}
- /** {@inheritDoc} */
+ /**
+ * Sets value to increment job priority by every time a lower priority job gets
+ * behind a higher priority job.
+ *
+ * @param starvationInc Increment value.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setStarvationIncrement(int starvationInc) {
+ public PriorityQueueCollisionSpi setStarvationIncrement(int starvationInc) {
this.starvationInc = starvationInc;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public boolean isStarvationPreventionEnabled() {
+ /**
+ * Gets flag indicating whether job starvation prevention is enabled.
+ *
+ * @return Flag indicating whether job starvation prevention is enabled.
+ */
+ public boolean isStarvationPreventionEnabled() {
return preventStarvation;
}
- /** {@inheritDoc} */
+ /**
+ * Sets flag indicating whether job starvation prevention is enabled.
+ *
+ * @param preventStarvation Flag indicating whether job starvation prevention is enabled.
+ * @return {@code this} for chaining.
+ */
@IgniteSpiConfiguration(optional = true)
- @Override public void setStarvationPreventionEnabled(boolean preventStarvation) {
+ public PriorityQueueCollisionSpi setStarvationPreventionEnabled(boolean preventStarvation) {
this.preventStarvation = preventStarvation;
+
+ return this;
}
/** {@inheritDoc} */
@@ -386,7 +478,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
log.debug(configInfo("preventStarvation", preventStarvation));
}
- registerMBean(igniteInstanceName, this, PriorityQueueCollisionSpiMBean.class);
+ registerMBean(igniteInstanceName, new PriorityQueueCollisionSpiMBeanImpl(this),
+ PriorityQueueCollisionSpiMBean.class);
// Ack start.
if (log.isDebugEnabled())
@@ -563,11 +656,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
return Collections.singletonList(createSpiAttributeName(PRIORITY_ATTRIBUTE_KEY));
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PriorityQueueCollisionSpi.class, this);
- }
-
/**
* Returns (possibly shared) comparator fo sorting GridCollisionJobContextWrapper
* by priority.
@@ -581,6 +669,18 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
return priComp;
}
+ /** {@inheritDoc} */
+ @Override public PriorityQueueCollisionSpi setName(String name) {
+ super.setName(name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PriorityQueueCollisionSpi.class, this);
+ }
+
/**
* Comparator for by priority comparison of collision contexts.
*/
@@ -630,4 +730,96 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
return originalIdx;
}
}
+
+ /**
+ * MBean implementation for PriorityQueueCollisionSpi.
+ */
+ private class PriorityQueueCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter
+ implements PriorityQueueCollisionSpiMBean {
+ /** {@inheritDoc} */
+ PriorityQueueCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
+ super(spiAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getParallelJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getParallelJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @IgniteSpiConfiguration(optional = true)
+ @Override public void setParallelJobsNumber(int parallelJobsNum) {
+ PriorityQueueCollisionSpi.this.setParallelJobsNumber(parallelJobsNum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getWaitingJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getWaitingJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWaitingJobsNumber(int waitJobsNum) {
+ PriorityQueueCollisionSpi.this.setWaitingJobsNumber(waitJobsNum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getPriorityAttributeKey() {
+ return PriorityQueueCollisionSpi.this.getPriorityAttributeKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getJobPriorityAttributeKey() {
+ return PriorityQueueCollisionSpi.this.getJobPriorityAttributeKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getDefaultPriority() {
+ return PriorityQueueCollisionSpi.this.getDefaultPriority();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDefaultPriority(int dfltPri) {
+ PriorityQueueCollisionSpi.this.setDefaultPriority(dfltPri);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getStarvationIncrement() {
+ return PriorityQueueCollisionSpi.this.getStarvationIncrement();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStarvationIncrement(int starvationInc) {
+ PriorityQueueCollisionSpi.this.setStarvationIncrement(starvationInc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStarvationPreventionEnabled() {
+ return PriorityQueueCollisionSpi.this.isStarvationPreventionEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStarvationPreventionEnabled(boolean preventStarvation) {
+ PriorityQueueCollisionSpi.this.setStarvationPreventionEnabled(preventStarvation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentWaitJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getCurrentWaitJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentActiveJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getCurrentActiveJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentRunningJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getCurrentRunningJobsNumber();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCurrentHeldJobsNumber() {
+ return PriorityQueueCollisionSpi.this.getCurrentHeldJobsNumber();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java
index b7f8ba1..e6a8412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpiMBean.java
@@ -41,7 +41,7 @@ public interface PriorityQueueCollisionSpiMBean extends IgniteSpiManagementMBean
@MXBeanDescription("Number of active jobs.")
public int getCurrentActiveJobsNumber();
- /*
+ /**
* Gets number of currently running (not {@code 'held}) jobs.
*
* @return Number of currently running (not {@code 'held}) jobs.