You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/14 19:59:39 UTC
[2/6] incubator-ignite git commit: ignite-1007 Race in data
structures processor
ignite-1007 Race in data structures processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/811872ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/811872ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/811872ce
Branch: refs/heads/ignite-484-1
Commit: 811872ce692666fe8c77235f175b7ec15f717d30
Parents: 5160088
Author: agura <ag...@gridgain.com>
Authored: Fri Jun 12 18:36:58 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jun 12 18:36:58 2015 +0300
----------------------------------------------------------------------
.../datastructures/DataStructuresProcessor.java | 67 +++++++++++++++++++-
1 file changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/811872ce/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index aa3bfe2..473a2ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -67,6 +67,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** */
private static final long RETRY_DELAY = 1;
+ /** Initialization latch. */
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Initialization failed flag. */
+ private boolean initFailed;
+
/** Cache contains only {@code GridCacheInternal,GridCacheInternal}. */
private IgniteInternalCache<GridCacheInternal, GridCacheInternal> dsView;
@@ -145,6 +151,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx = atomicsCache.context();
}
+
+ initLatch.countDown();
}
/**
@@ -167,6 +175,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
@Override public void onKernalStop(boolean cancel) {
super.onKernalStop(cancel);
+ if (initLatch.getCount() > 0) {
+ initFailed = true;
+
+ initLatch.countDown();
+ }
+
if (qryId != null)
dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
}
@@ -187,6 +201,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
{
A.notNull(name, "name");
+ awaitInitialization();
+
checkAtomicsConfiguration();
startQuery();
@@ -277,6 +293,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
public final void removeSequence(final String name) throws IgniteCheckedException {
assert name != null;
+ awaitInitialization();
+
checkAtomicsConfiguration();
removeDataStructure(new IgniteCallable<Void>() {
@@ -315,6 +333,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
final boolean create) throws IgniteCheckedException {
A.notNull(name, "name");
+ awaitInitialization();
+
checkAtomicsConfiguration();
startQuery();
@@ -431,6 +451,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert dsCacheCtx != null;
+ awaitInitialization();
+
removeDataStructure(new IgniteCallable<Void>() {
@Override public Void call() throws Exception {
dsCacheCtx.gate().enter();
@@ -520,6 +542,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
{
A.notNull(name, "name");
+ awaitInitialization();
+
checkAtomicsConfiguration();
startQuery();
@@ -585,6 +609,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert dsCacheCtx != null;
+ awaitInitialization();
+
removeDataStructure(new IgniteCallable<Void>() {
@Override public Void call() throws Exception {
dsCacheCtx.gate().enter();
@@ -623,6 +649,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
final S initStamp, final boolean create) throws IgniteCheckedException {
A.notNull(name, "name");
+ awaitInitialization();
+
checkAtomicsConfiguration();
startQuery();
@@ -688,6 +716,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert dsCacheCtx != null;
+ awaitInitialization();
+
removeDataStructure(new IgniteCallable<Void>() {
@Override public Void call() throws Exception {
dsCacheCtx.gate().enter();
@@ -725,6 +755,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
throws IgniteCheckedException {
A.notNull(name, "name");
+ awaitInitialization();
+
String cacheName = null;
if (cfg != null) {
@@ -801,6 +833,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert cctx != null;
+ awaitInitialization();
+
IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>() {
@Override public GridCacheQueueHeader call() throws Exception {
return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name));
@@ -837,6 +871,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
boolean create)
throws IgniteCheckedException
{
+ awaitInitialization();
+
Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
@@ -887,6 +923,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
+ * Awaits for processor initialization.
+ */
+ private void awaitInitialization() {
+ if (initLatch.getCount() > 0) {
+ try {
+ U.await(initLatch);
+
+ if (initFailed)
+ throw new IllegalStateException("Failed to initialize data structures processor.");
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IllegalStateException("Failed to initialize data structures processor " +
+ "(thread has been interrupted).", e);
+ }
+ }
+ }
+
+ /**
* @param dsMap Map with data structure information.
* @param info New data structure information.
* @param create Create flag.
@@ -930,6 +984,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
{
A.notNull(name, "name");
+ awaitInitialization();
+
if (create)
A.ensure(cnt >= 0, "count can not be negative");
@@ -997,9 +1053,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert dsCacheCtx != null;
+ awaitInitialization();
+
removeDataStructure(new IgniteCallable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
GridCacheInternal key = new GridCacheInternalKeyImpl(name);
dsCacheCtx.gate().enter();
@@ -1169,6 +1226,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
throws IgniteCheckedException {
A.notNull(name, "name");
+ awaitInitialization();
+
String cacheName = null;
if (cfg != null)
@@ -1196,6 +1255,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert name != null;
assert cctx != null;
+ awaitInitialization();
+
IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>() {
@Override public GridCacheSetHeader call() throws Exception {
return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name));
@@ -1326,7 +1387,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/**
*
*/
- static enum DataStructureType {
+ enum DataStructureType {
/** */
ATOMIC_LONG(IgniteAtomicLong.class.getSimpleName()),