You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/22 17:41:58 UTC
[26/50] [abbrv] ignite git commit: ignite-638: Implement
IgniteSemaphore data structure
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 2fd40f6..85a26ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -27,7 +27,9 @@ import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -514,6 +516,277 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
+ public void testSemaphoreTopologyChange() throws Exception {
+
+ try (IgniteSemaphore semaphore = grid(0).semaphore(STRUCTURE_NAME, 20, true, true)) {
+ try {
+ Ignite g = startGrid(NEW_GRID_NAME);
+
+ assert g.semaphore(STRUCTURE_NAME, 20, true, true).availablePermits() == 20;
+
+ g.semaphore(STRUCTURE_NAME, 20, true, true).acquire(10);
+
+ stopGrid(NEW_GRID_NAME);
+
+ assert grid(0).semaphore(STRUCTURE_NAME, 20, true, true).availablePermits() == 10;
+ }
+ finally {
+ grid(0).semaphore(STRUCTURE_NAME, 20, true, true).close();
+ }
+ }
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreConstantTopologyChange() throws Exception {
+ try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 10, false, true)) {
+ try {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ String name = UUID.randomUUID().toString();
+
+ try {
+ Ignite g = startGrid(name);
+
+ assert g.semaphore(STRUCTURE_NAME, 10, false, false) != null;
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1)
+ stopGrid(name);
+ }
+ }
+ }
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ int val = s.availablePermits();
+
+ while (!fut.isDone()) {
+ assert s.availablePermits() == val;
+
+ s.acquire();
+
+ assert s.availablePermits() == val - 1;
+
+ s.release();
+ }
+
+ fut.get();
+
+ for (Ignite g : G.allGrids())
+ assert g.semaphore(STRUCTURE_NAME, 0, false, true).availablePermits() == val;
+ }
+ finally {
+ grid(0).semaphore(STRUCTURE_NAME, 0, false, true).close();
+ }
+ }
+ }
+
+ /**
+ * This method tests if permits are successfully reassigned when a node fails in failoverSafe mode.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception {
+ try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true)) {
+ try {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ String name = UUID.randomUUID().toString();
+
+ try {
+ Ignite g = startGrid(name);
+
+ final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true);
+
+ assertNotNull(sem);
+
+ sem.acquire();
+
+ if (i == TOP_CHANGE_CNT - 1) {
+ sem.release();
+ }
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1) {
+ stopGrid(name);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ while (!fut.isDone()) {
+ s.release();
+
+ s.acquire();
+ }
+
+ fut.get();
+
+ int val = s.availablePermits();
+
+ assertEquals(val, TOP_CHANGE_CNT);
+
+ for (Ignite g : G.allGrids())
+ assertEquals(val, g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).availablePermits());
+ }
+ finally {
+ grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).close();
+ }
+ }
+ }
+
+ /**
+ * This method tests if permits are successfully reassigned when multiple nodes fail in failoverSafe mode.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+ final int numPermits = 3;
+
+ try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, numPermits, true, true)) {
+ try {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ Collection<String> names = new GridLeanSet<>(3);
+
+ try {
+ for (int j = 0; j < numPermits; j++) {
+ String name = UUID.randomUUID().toString();
+
+ names.add(name);
+
+ Ignite g = startGrid(name);
+
+ final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true);
+
+ assertNotNull(sem);
+
+ sem.acquire();
+
+ if (i == TOP_CHANGE_CNT - 1) {
+ sem.release();
+ }
+ }
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1)
+ for (String name : names) {
+ stopGrid(name);
+
+ awaitPartitionMapExchange();
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ while (!fut.isDone()) {
+ s.release();
+
+ s.acquire();
+ }
+
+ fut.get();
+
+ int val = s.availablePermits();
+
+ assertEquals(val, numPermits);
+
+ for (Ignite g : G.allGrids())
+ assertEquals(val, g.semaphore(STRUCTURE_NAME, 0, true, true).availablePermits());
+ }
+ finally {
+ grid(0).semaphore(STRUCTURE_NAME, 0, true, true).close();
+ }
+ }
+ }
+
+ /**
+ * This method test if exception is thrown when node fails in non FailoverSafe mode.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreConstantTopologyChangeNotFailoverSafe() throws Exception {
+ try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 1, false, true)) {
+ try {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < 2; i++) {
+ String name = UUID.randomUUID().toString();
+
+ try {
+ Ignite g = startGrid(name);
+
+ final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true);
+
+ assertNotNull(sem);
+
+ if (i != 1) {
+ sem.acquire();
+ }
+
+ }
+ finally {
+ if (i != 1) {
+ stopGrid(name);
+ }
+ }
+ }
+
+ }
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ while (s.availablePermits() != 0) {
+ // Wait for semaphore to be acquired.
+ }
+
+ try {
+ s.acquire();
+ fail("In non-FailoverSafe mode IgniteInterruptedCheckedException must be thrown.");
+ }
+ catch (Exception e) {
+ assert (e instanceof IgniteInterruptedException);
+ }
+
+ assertTrue(s.isBroken());
+
+ fut.get();
+ }
+ finally {
+ grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).close();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCountDownLatchConstantTopologyChange() throws Exception {
try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
try {
@@ -928,4 +1201,4 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 989f75f..bf6dcda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -267,6 +268,62 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
/**
* @throws Exception If failed.
*/
+ public void testSemaphore() throws Exception {
+ Ignite clientNode = clientIgnite();
+ Ignite srvNode = serverNode();
+
+ testSemaphore(clientNode, srvNode);
+ testSemaphore(srvNode, clientNode);
+ }
+
+ /**
+ * @param creator Creator node.
+ * @param other Other node.
+ * @throws Exception If failed.
+ */
+ private void testSemaphore(Ignite creator, final Ignite other) throws Exception {
+ assertNull(creator.semaphore("semaphore1", 1, true, false));
+ assertNull(other.semaphore("semaphore1", 1, true, false));
+
+ try (IgniteSemaphore semaphore = creator.semaphore("semaphore1", -1, true, true)) {
+ assertNotNull(semaphore);
+
+ assertEquals(-1, semaphore.availablePermits());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.sleep(1000);
+
+ IgniteSemaphore semaphore0 = other.semaphore("semaphore1", -1, true, false);
+
+ assertEquals(-1, semaphore0.availablePermits());
+
+ log.info("Release semaphore.");
+
+ semaphore0.release(2);
+
+ return null;
+ }
+ });
+
+ log.info("Acquire semaphore.");
+
+ assertTrue(semaphore.tryAcquire(1, 5000, TimeUnit.MILLISECONDS));
+
+ log.info("Finished wait.");
+
+ fut.get();
+
+ assertEquals(0, semaphore.availablePermits());
+ }
+
+ assertNull(creator.semaphore("semaphore1", 1, true, false));
+ assertNull(other.semaphore("semaphore1", 1, true, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQueue() throws Exception {
Ignite clientNode = clientIgnite();
Ignite srvNode = serverNode();
@@ -343,4 +400,4 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
return ignite;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index f5305a2..4a21765 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
@@ -239,7 +240,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
private void testUniqueName(final boolean singleGrid) throws Exception {
final String name = IgniteUuid.randomUuid().toString();
- final int DS_TYPES = 7;
+ final int DS_TYPES = 8;
final int THREADS = DS_TYPES * 3;
@@ -314,6 +315,12 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
break;
+ case 7:
+ log.info("Create atomic semaphore, grid: " + ignite.name());
+
+ res = ignite.semaphore(name, 0, false, true);
+
+ break;
default:
fail();
@@ -352,7 +359,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
res instanceof IgniteAtomicStamped ||
res instanceof IgniteCountDownLatch ||
res instanceof IgniteQueue ||
- res instanceof IgniteSet);
+ res instanceof IgniteSet ||
+ res instanceof IgniteSemaphore);
log.info("Data structure created: " + dataStructure);
@@ -371,4 +379,4 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
dataStructure.close();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
new file mode 100644
index 0000000..e60aed3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ * Cache semaphore self test.
+ */
+public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstractTest
+ implements Externalizable {
+ /** */
+ private static final int NODES_CNT = 4;
+
+ /** */
+ protected static final int THREADS_CNT = 5;
+
+ /** */
+ private static final Random RND = new Random();
+
+ /** */
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return NODES_CNT;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphore() throws Exception {
+ checkSemaphore();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailover() throws Exception {
+ if (atomicsCacheMode() == LOCAL)
+ return;
+
+ checkFailover(true);
+ checkFailover(false);
+ }
+
+ /**
+ * @param failoverSafe Failover safe flag.
+ * @throws Exception
+ */
+ private void checkFailover(boolean failoverSafe) throws Exception {
+ IgniteEx g = startGrid(NODES_CNT + 1);
+
+ // For vars locality.
+ {
+ // Ensure not exists.
+ assert g.semaphore("sem", 2, failoverSafe, false) == null;
+
+ IgniteSemaphore sem = g.semaphore(
+ "sem",
+ 2,
+ failoverSafe,
+ true);
+
+ sem.acquire(2);
+
+ assert !sem.tryAcquire();
+ assertEquals(
+ 0,
+ sem.availablePermits());
+ }
+
+ Ignite g0 = grid(0);
+
+ final IgniteSemaphore sem0 = g0.semaphore(
+ "sem",
+ -10,
+ false,
+ false);
+
+ assert !sem0.tryAcquire();
+ assertEquals(0, sem0.availablePermits());
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ sem0.acquire();
+
+ info("Acquired in separate thread.");
+
+ return null;
+ }
+ },
+ 1);
+
+ Thread.sleep(100);
+
+ g.close();
+
+ try {
+ fut.get(500);
+ }
+ catch (IgniteCheckedException e) {
+ if (!failoverSafe && e.hasCause(InterruptedException.class))
+ info("Ignored expected exception: " + e);
+ else
+ throw e;
+ }
+
+ sem0.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkSemaphore() throws Exception {
+ // Test API.
+ checkAcquire();
+
+ checkRelease();
+
+ checkFailoverSafe();
+
+ // Test main functionality.
+ IgniteSemaphore semaphore1 = grid(0).semaphore("semaphore", -2, true, true);
+
+ assertEquals(-2, semaphore1.availablePermits());
+
+ IgniteCompute comp = grid(0).compute().withAsync();
+
+ comp.call(new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ @Nullable @Override public Object call() throws Exception {
+ // Test semaphore in multiple threads on each node.
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ IgniteSemaphore semaphore = ignite.semaphore("semaphore", -2, true, true);
+
+ assert semaphore != null && semaphore.availablePermits() == -2;
+
+ log.info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+ assert semaphore.tryAcquire(1, 1, MINUTES);
+
+ log.info("Thread is again runnable: " + Thread.currentThread().getName());
+
+ semaphore.release();
+
+ return null;
+ }
+ },
+ 5,
+ "test-thread"
+ );
+
+ fut.get();
+
+ return null;
+ }
+ });
+
+ IgniteFuture<Object> fut = comp.future();
+
+ Thread.sleep(3000);
+
+ semaphore1.release(2);
+
+ assert semaphore1.availablePermits() == 0;
+
+ semaphore1.release(1);
+
+ // Ensure there are no hangs.
+ fut.get();
+
+ // Test operations on removed semaphore.
+ semaphore1.close();
+
+ checkRemovedSemaphore(semaphore1);
+ }
+
+ /**
+ * @param semaphore Semaphore.
+ * @throws Exception If failed.
+ */
+ protected void checkRemovedSemaphore(final IgniteSemaphore semaphore) throws Exception {
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return semaphore.removed();
+ }
+ }, 5000);
+
+ assert semaphore.removed();
+ }
+
+ /**
+ * This method only checks if parameter of new semaphore is initialized properly.
+ * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest.
+ *
+ * @throws Exception Exception.
+ */
+ private void checkFailoverSafe() throws Exception {
+ // Checks only if semaphore is initialized properly
+ IgniteSemaphore semaphore = createSemaphore("rmv", 5, true);
+
+ assert semaphore.isFailoverSafe();
+
+ removeSemaphore("rmv");
+
+ IgniteSemaphore semaphore1 = createSemaphore("rmv1", 5, false);
+
+ assert !semaphore1.isFailoverSafe();
+
+ removeSemaphore("rmv1");
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ private void checkAcquire() throws Exception {
+ // Check only 'false' cases here. Successful await is tested over the grid.
+ IgniteSemaphore semaphore = createSemaphore("acquire", 5, false);
+
+ assert !semaphore.tryAcquire(10);
+ assert !semaphore.tryAcquire(10, 10, MICROSECONDS);
+
+ removeSemaphore("acquire");
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ private void checkRelease() throws Exception {
+ IgniteSemaphore semaphore = createSemaphore("release", 5, false);
+
+ semaphore.release();
+ assert semaphore.availablePermits() == 6;
+
+ semaphore.release(2);
+ assert semaphore.availablePermits() == 8;
+
+ assert semaphore.drainPermits() == 8;
+ assert semaphore.availablePermits() == 0;
+
+ removeSemaphore("release");
+
+ checkRemovedSemaphore(semaphore);
+
+ IgniteSemaphore semaphore2 = createSemaphore("release2", -5, false);
+
+ semaphore2.release();
+
+ assert semaphore2.availablePermits() == -4;
+
+ semaphore2.release(2);
+
+ assert semaphore2.availablePermits() == -2;
+
+ assert semaphore2.drainPermits() == -2;
+
+ assert semaphore2.availablePermits() == 0;
+
+ removeSemaphore("release2");
+
+ checkRemovedSemaphore(semaphore2);
+ }
+
+ /**
+ * @param semaphoreName Semaphore name.
+ * @param numPermissions Initial number of permissions.
+ * @param failoverSafe Fairness flag.
+ * @return New semaphore.
+ * @throws Exception If failed.
+ */
+ private IgniteSemaphore createSemaphore(String semaphoreName, int numPermissions, boolean failoverSafe)
+ throws Exception {
+ IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, numPermissions, failoverSafe, true);
+
+ // Test initialization.
+ assert semaphoreName.equals(semaphore.name());
+ assert semaphore.availablePermits() == numPermissions;
+ assert semaphore.getQueueLength() == 0;
+ assert semaphore.isFailoverSafe() == failoverSafe;
+
+ return semaphore;
+ }
+
+ /**
+ * @param semaphoreName Semaphore name.
+ * @throws Exception If failed.
+ */
+ private void removeSemaphore(String semaphoreName)
+ throws Exception {
+ IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 10, false, true);
+
+ assert semaphore != null;
+
+ if (semaphore.availablePermits() < 0)
+ semaphore.release(-semaphore.availablePermits());
+
+ // Remove semaphore on random node.
+ IgniteSemaphore semaphore0 = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 0, false, true);
+
+ assertNotNull(semaphore0);
+
+ semaphore0.close();
+
+ // Ensure semaphore is removed on all nodes.
+ for (Ignite g : G.allGrids())
+ assertNull(((IgniteKernal)g).context().dataStructures().semaphore(semaphoreName, 10, true, false));
+
+ checkRemovedSemaphore(semaphore);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSemaphoreMultinode1() throws Exception {
+ if (gridCount() == 1)
+ return;
+
+ IgniteSemaphore semaphore = grid(0).semaphore("s1", 0, true, true);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < gridCount(); i++) {
+ final Ignite ignite = grid(i);
+
+ futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteSemaphore semaphore = ignite.semaphore("s1", 0, true, false);
+
+ assertNotNull(semaphore);
+
+ boolean wait = semaphore.tryAcquire(30_000, MILLISECONDS);
+
+ assertTrue(wait);
+
+ return null;
+ }
+ }));
+ }
+
+ for (int i = 0; i < 10; i++)
+ semaphore.release();
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(30_000);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
new file mode 100644
index 0000000..a516fc1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.local;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ *
+ */
+public class IgniteLocalSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testSemaphore() throws Exception {
+ // Test main functionality.
+ IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+ assertNotNull(semaphore);
+
+ assertEquals(-2, semaphore.availablePermits());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true);
+
+ assert semaphore != null && semaphore.availablePermits() == -2;
+
+ info("Thread is going to wait on semaphore: " + Thread.currentThread().getName());
+
+ assert semaphore.tryAcquire(1, 1, MINUTES);
+
+ info("Thread is again runnable: " + Thread.currentThread().getName());
+
+ semaphore.release();
+
+ return null;
+ }
+ },
+ THREADS_CNT,
+ "test-thread"
+ );
+
+ Thread.sleep(3000);
+
+ assert semaphore.availablePermits() == -2;
+
+ semaphore.release(2);
+
+ assert semaphore.availablePermits() == 0;
+
+ semaphore.release();
+
+ // Ensure there are no hangs.
+ fut.get();
+
+ // Test operations on removed latch.
+ IgniteSemaphore semaphore0 = grid(0).semaphore("semaphore", 0, false, false);
+
+ assertNotNull(semaphore0);
+
+ semaphore0.close();
+
+ checkRemovedSemaphore(semaphore0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
new file mode 100644
index 0000000..c302cad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index 69de7cd..d0131d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -50,4 +50,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest
@Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
return TRANSACTIONAL;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
new file mode 100644
index 0000000..f58754f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.replicated;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteReplicatedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode atomicsCacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
index 2fa8940..d4ca9a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CollectionConfiguration;
@@ -60,6 +61,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Count down latch name. */
private static final String TEST_LATCH_NAME = "test-latch";
+ /** Semaphore name. */
+ private static final String TEST_SEMAPHORE_NAME = "test-semaphore";
+
/** */
private static final CollectionConfiguration colCfg = new CollectionConfiguration();
@@ -69,6 +73,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Count down latch initial count. */
private static final int LATCH_INIT_CNT = 1000;
+ /** Semaphore initial count. */
+ private static final int SEMAPHORE_INIT_CNT = 1000;
+
/** */
private static final boolean LONG = false;
@@ -88,6 +95,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
private static final boolean LATCH = true;
/** */
+ private static final boolean SEMAPHORE = true;
+
+ /** */
private GridCacheDataStructuresLoadTest() {
// No-op
}
@@ -95,210 +105,247 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
/** Atomic long write closure. */
private final CIX1<Ignite> longWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- al.addAndGet(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ al.addAndGet(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic long read closure. */
private final CIX1<Ignite> longReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- al.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ al.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic reference write closure. */
private final CIX1<Ignite> refWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
- null, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
+ null, true);
- for (int i = 0; i < operationsPerTx; i++) {
- ar.set(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ ar.set(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic reference read closure. */
private final CIX1<Ignite> refReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
- true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
+ true);
- for (int i = 0; i < operationsPerTx; i++) {
- ar.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ ar.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic sequence write closure. */
private final CIX1<Ignite> seqWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.addAndGet(RAND.nextInt(MAX_INT) + 1);
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.addAndGet(RAND.nextInt(MAX_INT) + 1);
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic sequence read closure. */
private final CIX1<Ignite> seqReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Atomic stamped write closure. */
private final CIX1<Ignite> stampWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
- 0, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+ 0, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Atomic stamped read closure. */
private final CIX1<Ignite> stampReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
- 0, 0, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
+ 0, 0, true);
- for (int i = 0; i < operationsPerTx; i++) {
- as.get();
+ for (int i = 0; i < operationsPerTx; i++) {
+ as.get();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Queue write closure. */
private final CIX1<Ignite> queueWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+ @Override public void applyx(Ignite ignite) {
+ IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
- for (int i = 0; i < operationsPerTx; i++) {
- q.put(RAND.nextInt(MAX_INT));
+ for (int i = 0; i < operationsPerTx; i++) {
+ q.put(RAND.nextInt(MAX_INT));
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Queue read closure. */
private final CIX1<Ignite> queueReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
+ @Override public void applyx(Ignite ignite) {
+ IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
- for (int i = 0; i < operationsPerTx; i++) {
- q.peek();
+ for (int i = 0; i < operationsPerTx; i++) {
+ q.peek();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
/** Count down latch write closure. */
private final CIX1<Ignite> latchWriteClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
- for (int i = 0; i < operationsPerTx; i++) {
- l.countDown();
+ for (int i = 0; i < operationsPerTx; i++) {
+ l.countDown();
- long cnt = writes.incrementAndGet();
+ long cnt = writes.incrementAndGet();
- if (cnt % WRITE_LOG_MOD == 0)
- info("Performed " + cnt + " writes.");
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
}
- }
- };
+ };
/** Count down latch read closure. */
private final CIX1<Ignite> latchReadClos =
new CIX1<Ignite>() {
- @Override public void applyx(Ignite ignite) {
- IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
+ @Override public void applyx(Ignite ignite) {
+ IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
- for (int i = 0; i < operationsPerTx; i++) {
- l.count();
+ for (int i = 0; i < operationsPerTx; i++) {
+ l.count();
- long cnt = reads.incrementAndGet();
+ long cnt = reads.incrementAndGet();
- if (cnt % READ_LOG_MOD == 0)
- info("Performed " + cnt + " reads.");
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
}
- }
- };
+ };
+
+ /** Semaphore write closure. */
+ private final CIX1<Ignite> semaphoreWriteClos =
+ new CIX1<Ignite>() {
+ @Override public void applyx(Ignite ignite) {
+ IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+ for (int i = 0; i < operationsPerTx; i++) {
+ if ((i % 2) == 0)
+ s.release();
+ else
+ s.acquire();
+
+ long cnt = writes.incrementAndGet();
+
+ if (cnt % WRITE_LOG_MOD == 0)
+ info("Performed " + cnt + " writes.");
+ }
+ }
+ };
+
+ /** Semaphore read closure. */
+ private final CIX1<Ignite> semaphoreReadClos =
+ new CIX1<Ignite>() {
+ @Override public void applyx(Ignite ignite) {
+ IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
+
+ for (int i = 0; i < operationsPerTx; i++) {
+ s.availablePermits();
+
+ long cnt = reads.incrementAndGet();
+
+ if (cnt % READ_LOG_MOD == 0)
+ info("Performed " + cnt + " reads.");
+ }
+ }
+ };
/**
* @param args Arguments.
@@ -362,6 +409,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
test.loadTestIgnite(test.latchWriteClos, test.latchReadClos);
}
+
+ System.gc();
+
+ if (SEMAPHORE) {
+ info("Testing semaphore...");
+
+ test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos);
+ }
}
}
@@ -407,7 +462,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
@Nullable @Override public Object call() throws Exception {
long start = System.currentTimeMillis();
- while(!done.get()) {
+ while (!done.get()) {
if (tx) {
try (Transaction tx = ignite.transactions().txStart()) {
readClos.apply(ignite);
@@ -447,4 +502,4 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index bfeafdf..1940077 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteFileSystem;
@@ -313,6 +314,15 @@ public class IgniteMock implements Ignite {
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(String name,
+ int cnt,
+ boolean failoverSafe,
+ boolean create)
+ {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 3eb9d98..45b82ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
@@ -535,6 +536,12 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
+ @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe,
+ boolean create) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T> IgniteQueue<T> queue(String name, int cap,
@Nullable CollectionConfiguration cfg) throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 7740907..c5cde89 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -406,6 +406,18 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteSemaphore semaphore(String name,
+ int cnt,
+ boolean failoverSafe,
+ boolean create)
+ {
+ assert g != null;
+
+ return g.semaphore(name, cnt,
+ failoverSafe, create);
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg)