You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/10/23 18:10:44 UTC
[3/6] ignite git commit: Fixes formatting issues;
Fixes formatting issues;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/77f9deb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/77f9deb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/77f9deb7
Branch: refs/heads/ignite-638
Commit: 77f9deb7f79acb04b73994e83efd4539646073e6
Parents: be332a8
Author: Vladisav Jelisavcic <vl...@gmail.com>
Authored: Sat Oct 3 12:44:02 2015 +0200
Committer: vladisav <vl...@gmail.com>
Committed: Sat Oct 3 12:44:02 2015 +0200
----------------------------------------------------------------------
.../datastructures/GridCacheSemaphoreImpl.java | 158 +++++++++++--------
.../datastructures/GridCacheSemaphoreState.java | 12 +-
2 files changed, 97 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 17efc61..d2a966f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -96,30 +96,61 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
abstract class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
+ /** Thread map. */
protected final ConcurrentMap<Thread, Integer> threadMap;
+
+ /** Total number of threads currently waiting on this semaphore. */
protected int totalWaiters;
- Sync(int permits) {
+ protected Sync(int permits) {
setState(permits);
threadMap = new ConcurrentHashMap<>();
}
+ /**
+ * Sets the estimate of the total current number of threads waiting on this semaphore. This method should only
+ * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param waiters Thread count.
+ */
protected synchronized void setWaiters(int waiters) {
totalWaiters = waiters;
}
+ /**
+ * Gets the number of waiting threads.
+ *
+ * @return Number of thread waiting at this semaphore.
+ */
public int getWaiters() {
return totalWaiters;
}
+ /**
+ * Sets the number of permits currently available on this semaphore. This method should only be used in
+ * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+ *
+ * @param permits Number of permits available at this semaphore.
+ */
final synchronized void setPermits(int permits) {
setState(permits);
}
+ /**
+ * Gets the number of permissions currently available.
+ *
+ * @return Number of permits available at this semaphore.
+ */
final int getPermits() {
return getState();
}
+ /**
+ * This method is used by the AQS to test if the current thread should block or not.
+ *
+ * @param acquires Number of permits to acquire.
+ * @return Negative number if thread should block, positive if thread successfully acquires permits.
+ */
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
@@ -137,7 +168,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- protected final boolean tryReleaseShared(int releases) {
+ /** {@inheritDoc} */
+ @Override protected final boolean tryReleaseShared(int releases) {
// Check if some other node updated the state.
// This method is called with release==0 only when trying to wake through update.
if (releases == 0)
@@ -156,20 +188,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- final void reducePermits(int reductions) {
- for (; ; ) {
- int current = getState();
-
- int next = current - reductions;
-
- if (next > current) // underflow
- throw new Error("Permit count underflow");
-
- if (compareAndSetGlobalState(current, next))
- return;
- }
- }
-
+ /**
+ * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}.
+ *
+ * @return Number of permits to drain.
+ */
final int drainPermits() {
for (; ; ) {
@@ -180,12 +203,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
+ /**
+ * This method is used when thread blocks on this semaphore to synchronize the waiting thread counter across all
+ * nodes.
+ */
protected void getAndIncWaitingCount() {
try {
CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
+ @Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -221,12 +247,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
+ /**
+ * This method is used for synchronizing the semaphore state across all nodes.
+ */
protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
try {
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
+ @Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -281,7 +309,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
super(permits);
}
- protected int tryAcquireShared(int acquires) {
+ /** {@inheritDoc} */
+ @Override protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
@@ -296,7 +325,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
super(permits);
}
- protected int tryAcquireShared(int acquires) {
+ /** {@inheritDoc} */
+ @Override protected int tryAcquireShared(int acquires) {
for (; ; ) {
if (hasQueuedPredecessors())
return -1;
@@ -355,8 +385,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
try {
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
- @Override
- public Sync call() throws Exception {
+ @Override public Sync call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -428,18 +457,18 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.releaseShared(0);
}
- @Override
- public void needCheckNotRemoved() {
+ /** {@inheritDoc} */
+ @Override public void needCheckNotRemoved() {
// No-op.
}
- @Override
- public void acquire() throws IgniteException {
+ /** {@inheritDoc} */
+ @Override public void acquire() throws IgniteException {
acquire(1);
}
- @Override
- public void acquire(int permits) throws IgniteInterruptedException {
+ /** {@inheritDoc} */
+ @Override public void acquire(int permits) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -455,8 +484,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void acquireUninterruptibly() {
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly() {
try {
initializeSemaphore();
@@ -467,8 +496,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void acquireUninterruptibly(int permits) {
+ /** {@inheritDoc} */
+ @Override public void acquireUninterruptibly(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
@@ -480,15 +509,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public int availablePermits() {
+ /** {@inheritDoc} */
+ @Override public int availablePermits() {
int ret;
try {
initializeSemaphore();
ret = CU.outTx(
retryTopologySafe(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
+ @Override public Integer call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semaphoreView.get(key);
@@ -512,8 +540,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return ret;
}
- @Override
- public int drainPermits() {
+ /** {@inheritDoc} */
+ @Override public int drainPermits() {
try {
initializeSemaphore();
@@ -524,8 +552,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire() {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire() {
try {
initializeSemaphore();
@@ -536,8 +564,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
try {
initializeSemaphore();
@@ -551,13 +579,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void release() {
+ /** {@inheritDoc} */
+ @Override public void release() {
release(1);
}
- @Override
- public void release(int permits) {
+ /** {@inheritDoc} */
+ @Override public void release(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -570,8 +598,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(int permits) {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits) {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
@@ -584,8 +612,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+ /** {@inheritDoc} */
+ @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
A.ensure(permits >= 0, "Number of permits must be non-negative.");
try {
initializeSemaphore();
@@ -600,13 +628,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public boolean isFair() {
- return false;
+ /** {@inheritDoc} */
+ @Override public boolean isFair() {
+ return isFair;
}
- @Override
- public boolean hasQueuedThreads() {
+ /** {@inheritDoc} */
+ @Override public boolean hasQueuedThreads() {
try {
initializeSemaphore();
@@ -617,8 +645,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public int getQueueLength() {
+ /** {@inheritDoc} */
+ @Override public int getQueueLength() {
try {
initializeSemaphore();
@@ -629,22 +657,22 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
}
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());
out.writeUTF(name);
}
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
IgniteBiTuple<GridKernalContext, String> t = stash.get();
t.set1((GridKernalContext)in.readObject());
t.set2(in.readUTF());
}
- @Override
- public void close() {
+ /** {@inheritDoc} */
+ @Override public void close() {
if (!rmvd) {
try {
ctx.kernalContext().dataStructures().removeSemaphore(name);
http://git-wip-us.apache.org/repos/asf/ignite/blob/77f9deb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index a02b7c9..e25649e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -87,16 +87,14 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public Object clone() throws CloneNotSupportedException {
+ @Override public Object clone() throws CloneNotSupportedException {
return super.clone();
}
/**
* {@inheritDoc}
*/
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(cnt);
out.writeInt(waiters);
out.writeBoolean(fair);
@@ -105,8 +103,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public void readExternal(ObjectInput in) throws IOException {
+ @Override public void readExternal(ObjectInput in) throws IOException {
cnt = in.readInt();
waiters = in.readInt();
fair = in.readBoolean();
@@ -115,8 +112,7 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
/**
* {@inheritDoc}
*/
- @Override
- public String toString() {
+ @Override public String toString() {
return S.toString(GridCacheSemaphoreState.class, this);
}
}