You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/09 00:28:44 UTC
[06/42] incubator-ignite git commit: # ignite-883
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/873e01bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/873e01bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/873e01bb
Branch: refs/heads/ignite-389
Commit: 873e01bbb61ed5782635fa81e1c9834f261f50fe
Parents: 7a2e898
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 12:29:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 16:52:47 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 38 +-
.../internal/managers/GridManagerAdapter.java | 9 +
.../checkpoint/GridCheckpointManager.java | 52 ++-
.../processors/cache/GridCacheAdapter.java | 4 +
.../processors/cache/GridCacheTtlManager.java | 9 +-
.../cache/transactions/IgniteTxManager.java | 3 -
.../datastructures/DataStructuresProcessor.java | 66 +--
.../timeout/GridSpiTimeoutObject.java | 59 +++
.../timeout/GridTimeoutProcessor.java | 24 +-
.../util/nio/GridCommunicationClient.java | 26 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 23 +-
.../org/apache/ignite/spi/IgniteSpiContext.java | 10 +
.../ignite/spi/IgniteSpiTimeoutObject.java | 40 ++
.../spi/checkpoint/noop/NoopCheckpointSpi.java | 3 +-
.../communication/tcp/TcpCommunicationSpi.java | 409 ++++++-------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 -
.../ignite/spi/discovery/tcp/ServerImpl.java | 1 -
.../spi/discovery/tcp/TcpDiscoverySpi.java | 156 +------
.../IgniteCountDownLatchAbstractSelfTest.java | 102 +++++
.../IgniteCacheClientNearCacheExpiryTest.java | 103 +++++
.../IgniteCacheExpiryPolicyTestSuite.java | 2 +
.../testframework/GridSpiTestContext.java | 10 +
22 files changed, 642 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index ed33365..5cbe377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,6 +1447,17 @@ public class IgnitionEx {
ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
}
+ execSvc = new IgniteThreadPoolExecutor(
+ "pub-" + cfg.getGridName(),
+ cfg.getPublicThreadPoolSize(),
+ cfg.getPublicThreadPoolSize(),
+ DFLT_PUBLIC_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+ if (!myCfg.isClientMode())
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
+
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1456,30 +1467,8 @@ public class IgnitionEx {
DFLT_SYSTEM_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
- boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
-
- if (isClientMode) {
- execSvc = new IgniteThreadPoolExecutor(
- "pub-" + cfg.getGridName(),
- 0,
- cfg.getPublicThreadPoolSize(),
- 2000,
- new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
- }
- else {
- execSvc = new IgniteThreadPoolExecutor(
- "pub-" + cfg.getGridName(),
- cfg.getPublicThreadPoolSize(),
- cfg.getPublicThreadPoolSize(),
- DFLT_PUBLIC_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
- }
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
@@ -2007,7 +1996,6 @@ public class IgnitionEx {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
- ccfg.setNearConfiguration(new NearCacheConfiguration());
if (cfg.getCacheMode() == PARTITIONED)
ccfg.setBackups(cfg.getBackups());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1eb7143..bea4256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -483,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.discovery().tryFailNode(nodeId);
}
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
+
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
+
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 2e80b6f..ce2a36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
private final GridMessageListener lsnr = new CheckpointRequestListener();
/** */
- private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap;
/** */
- private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>(
- MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q);
+ private final Collection<IgniteUuid> closedSess;
/** Grid marshaller. */
private final Marshaller marsh;
@@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
super(ctx, ctx.config().getCheckpointSpi());
marsh = ctx.config().getMarshaller();
+
+ if (enabled()) {
+ keyMap = new ConcurrentHashMap8<>();
+
+ closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS,
+ MAX_CLOSED_SESS,
+ 0.75f,
+ 256,
+ PER_SEGMENT_Q);
+ }
+ else {
+ keyMap = null;
+
+ closedSess = null;
+ }
}
/** {@inheritDoc} */
@@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Session IDs.
*/
public Collection<IgniteUuid> sessionIds() {
- return new ArrayList<>(keyMap.keySet());
+ return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList();
}
/**
@@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return {@code true} if checkpoint has been actually saved, {@code false} otherwise.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope,
- long timeout, boolean override) throws IgniteCheckedException {
+ public boolean storeCheckpoint(GridTaskSessionInternal ses,
+ String key,
+ Object state,
+ ComputeTaskSessionScope scope,
+ long timeout,
+ boolean override)
+ throws IgniteCheckedException
+ {
+ if (!enabled())
+ return false;
+
assert ses != null;
assert key != null;
@@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Whether or not checkpoint was removed.
*/
public boolean removeCheckpoint(String key) {
+ if (!enabled())
+ return false;
+
assert key != null;
boolean rmv = false;
@@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Whether or not checkpoint was removed.
*/
public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) {
+ if (!enabled())
+ return false;
+
assert ses != null;
assert key != null;
@@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException {
+ if (!enabled())
+ return null;
+
assert ses != null;
assert key != null;
@@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @param cleanup Whether cleanup or not.
*/
public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) {
+ if (!enabled())
+ return;
+
closedSess.add(ses.getId());
// If on task node.
@@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']');
- X.println(">>> keyMap: " + keyMap.size());
+ X.println(">>> keyMap: " + (keyMap != null ? keyMap.size() : 0));
}
/**
@@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
if (log.isDebugEnabled())
log.debug("Received checkpoint request: " + req);
+ if (!enabled())
+ return;
+
IgniteUuid sesId = req.getSessionId();
if (closedSess.contains(sesId)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9d98ce7..4216895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -395,6 +395,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ assert !CU.isUtilityCache(ctx.name());
+ assert !CU.isAtomicsCache(ctx.name());
+ assert !CU.isMarshallerCache(ctx.name());
+
CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5f9049a..9bd6321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
- if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
+ boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
+ !cctx.config().isEagerTtl() ||
+ CU.isAtomicsCache(cctx.name()) ||
+ CU.isMarshallerCache(cctx.name()) ||
+ CU.isUtilityCache(cctx.name()) ||
+ (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
+
+ if (cleanupDisabled)
return;
cleanupWorker = new CleanupWorker();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4666cca..b6c77f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
collectPendingVersions(dhtTxLoc);
}
- // 3.1 Call dataStructures manager.
- cctx.kernalContext().dataStructures().onTxCommitted(tx);
-
// 4. Unlock write resources.
unlockMultiple(tx, tx.writeEntries());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 72911af..27f6a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -32,6 +32,7 @@ import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
+import javax.cache.event.*;
import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -40,7 +41,6 @@ import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -99,6 +99,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** */
private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
+ /** */
+ private UUID qryId;
+
/**
* @param ctx Context.
*/
@@ -112,7 +115,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() {
+ @Override public void onKernalStart() throws IgniteCheckedException {
if (ctx.config().isDaemon())
return;
@@ -139,10 +142,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
seqView = atomicsCache;
- dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+ dsCacheCtx = atomicsCache.context();
+
+ qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+ null,
+ dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+ false);
}
}
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (qryId != null)
+ dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
+ }
+
/**
* Gets a sequence from cache or creates one if it's not cached.
*
@@ -906,8 +922,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue val = cast(dsView.get(key),
- GridCacheCountDownLatchValue.class);
+ GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
// Check that count down hasn't been created in other thread yet.
GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
@@ -1034,28 +1049,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
- * Transaction committed callback for transaction manager.
*
- * @param tx Committed transaction.
*/
- public <K, V> void onTxCommitted(IgniteInternalTx tx) {
- if (dsCacheCtx == null)
- return;
-
- if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
- Collection<IgniteTxEntry> entries = tx.writeEntries();
-
- if (log.isDebugEnabled())
- log.debug("Committed entries: " + entries);
-
- for (IgniteTxEntry entry : entries) {
- // Check updated or created GridCacheInternalKey keys.
- if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) {
- GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
-
- Object val0 = CU.value(entry.value(), entry.context(), false);
+ private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
+ if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
+ GridCacheInternal val0 = evt.getValue();
if (val0 instanceof GridCacheCountDownLatchValue) {
+ GridCacheInternalKey key = evt.getKey();
+
// Notify latch on changes.
GridCacheRemovable latch = dsMap.get(key);
@@ -1067,8 +1073,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
latch0.onUpdate(val.get());
if (val.get() == 0 && val.autoDelete()) {
- entry.cached().markObsolete(dsCacheCtx.versions().next());
-
dsMap.remove(key);
latch.onRemoved();
@@ -1080,11 +1084,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
", actual=" + latch.getClass() + ", value=" + latch + ']');
}
}
+
}
+ else {
+ assert evt.getEventType() == EventType.REMOVED : evt;
- // Check deleted GridCacheInternal keys.
- if (entry.op() == DELETE && entry.key().internal()) {
- GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+ GridCacheInternal key = evt.getKey();
// Entry's val is null if entry deleted.
GridCacheRemovable obj = dsMap.remove(key);
@@ -1094,6 +1099,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStructuresEntryListener.class, this);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
new file mode 100644
index 0000000..82267a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.timeout;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Wrapper for {@link IgniteSpiTimeoutObject}.
+ */
+public class GridSpiTimeoutObject implements GridTimeoutObject {
+ /** */
+ @GridToStringInclude
+ private final IgniteSpiTimeoutObject obj;
+
+ /**
+ * @param obj SPI object.
+ */
+ public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) {
+ this.obj = obj;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ obj.onTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return obj.id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return obj.endTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final String toString() {
+ return S.toString(GridSpiTimeoutObject.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index e9b7717..e4f370c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
@@ -111,8 +112,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
* @return Cancelable to cancel task.
*/
public CancelableTask schedule(Runnable task, long delay, long period) {
- assert delay >= 0;
- assert period > 0 || period == -1;
+ assert delay >= 0 : delay;
+ assert period > 0 || period == -1 : period;
CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
@@ -203,7 +204,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
*/
public class CancelableTask implements GridTimeoutObject, Closeable {
/** */
- private final IgniteUuid id = new IgniteUuid();
+ private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private long endTime;
@@ -215,12 +216,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
private volatile boolean cancel;
/** */
+ @GridToStringInclude
private final Runnable task;
/**
+ * @param task Task to execute.
* @param firstTime First time.
* @param period Period.
- * @param task Task to execute.
*/
CancelableTask(Runnable task, long firstTime, long period) {
this.task = task;
@@ -243,19 +245,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
if (cancel)
return;
- long startTime = U.currentTimeMillis();
-
try {
task.run();
}
finally {
- long executionTime = U.currentTimeMillis() - startTime;
-
- if (executionTime > 10) {
- U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
- "quickly [executionTime=" + executionTime + ']');
- }
-
if (!cancel && period > 0) {
endTime = U.currentTimeMillis() + period;
@@ -273,5 +266,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
removeTimeoutObject(this);
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CancelableTask.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 31396fb..2f7fd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -38,58 +38,58 @@ public interface GridCommunicationClient {
* @param handshakeC Handshake.
* @throws IgniteCheckedException If handshake failed.
*/
- void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
+ public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
/**
* @return {@code True} if client has been closed by this call,
* {@code false} if failed to close client (due to concurrent reservation or concurrent close).
*/
- boolean close();
+ public boolean close();
/**
* Forces client close.
*/
- void forceClose();
+ public void forceClose();
/**
* @return {@code True} if client is closed;
*/
- boolean closed();
+ public boolean closed();
/**
* @return {@code True} if client was reserved, {@code false} otherwise.
*/
- boolean reserve();
+ public boolean reserve();
/**
* Releases this client by decreasing reservations.
*/
- void release();
+ public void release();
/**
* @return {@code True} if client was reserved.
*/
- boolean reserved();
+ public boolean reserved();
/**
* Gets idle time of this client.
*
* @return Idle time of this client.
*/
- long getIdleTime();
+ public long getIdleTime();
/**
* @param data Data to send.
* @throws IgniteCheckedException If failed.
*/
- void sendMessage(ByteBuffer data) throws IgniteCheckedException;
+ public void sendMessage(ByteBuffer data) throws IgniteCheckedException;
/**
* @param data Data to send.
* @param len Length.
* @throws IgniteCheckedException If failed.
*/
- void sendMessage(byte[] data, int len) throws IgniteCheckedException;
+ public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
/**
* @param nodeId Node ID (provided only if versions of local and remote nodes are different).
@@ -97,16 +97,16 @@ public interface GridCommunicationClient {
* @throws IgniteCheckedException If failed.
* @return {@code True} if should try to resend message.
*/
- boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
+ public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
/**
* @param timeout Timeout.
* @throws IOException If failed.
*/
- void flushIfNeeded(long timeout) throws IOException;
+ public void flushIfNeeded(long timeout) throws IOException;
/**
* @return {@code True} if send is asynchronous.
*/
- boolean async();
+ public boolean async();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index d3c0587..c9c633f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -542,10 +543,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
return U.spiAttribute(this, attrName);
}
+ /** {@inheritDoc} */
+ protected void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ spiCtx.addTimeoutObject(obj);
+ }
+
+ /** {@inheritDoc} */
+ protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ spiCtx.removeTimeoutObject(obj);
+ }
+
/**
* Temporarily SPI context.
*/
- private static class GridDummySpiContext implements IgniteSpiContext {
+ private class GridDummySpiContext implements IgniteSpiContext {
/** */
private final ClusterNode locNode;
@@ -716,5 +727,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
@Override public boolean tryFailNode(UUID nodeId) {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 55f46e5..f83326c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -310,4 +310,14 @@ public interface IgniteSpiContext {
* @return If node was failed.
*/
public boolean tryFailNode(UUID nodeId);
+
+ /**
+ * @param c Timeout object.
+ */
+ public void addTimeoutObject(IgniteSpiTimeoutObject c);
+
+ /**
+ * @param c Timeout object.
+ */
+ public void removeTimeoutObject(IgniteSpiTimeoutObject c);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
new file mode 100644
index 0000000..f7fbd27f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.lang.*;
+
+/**
+ *
+ */
+public interface IgniteSpiTimeoutObject {
+ /**
+ * @return Unique object ID.
+ */
+ public IgniteUuid id();
+
+ /**
+ * @return End time.
+ */
+ public long endTime();
+
+ /**
+ * Timeout callback.
+ */
+ public void onTimeout();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
index 460cff3..832d872 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
@@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
}
/** {@inheritDoc} */
- @Override
- public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException {
+ @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 19e54c8..b324ab2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -267,7 +267,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
- recoveryWorker.addReconnectRequest(recoveryData);
+ commWorker.addReconnectRequest(recoveryData);
}
}
else
@@ -647,17 +647,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
- /** Idle client worker. */
- private IdleClientWorker idleClientWorker;
-
/** Flush client worker. */
private ClientFlushWorker clientFlushWorker;
- /** Socket timeout worker. */
- private SocketTimeoutWorker sockTimeoutWorker;
-
- /** Recovery worker. */
- private RecoveryWorker recoveryWorker;
+ /** Recovery and idle clients handler. */
+ private CommunicationWorker commWorker;
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -1274,13 +1268,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.start();
- idleClientWorker = new IdleClientWorker();
-
- idleClientWorker.start();
-
- recoveryWorker = new RecoveryWorker();
+ commWorker = new CommunicationWorker();
- recoveryWorker.start();
+ commWorker.start();
if (connBufSize > 0) {
clientFlushWorker = new ClientFlushWorker();
@@ -1288,10 +1278,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
clientFlushWorker.start();
}
- sockTimeoutWorker = new SocketTimeoutWorker();
-
- sockTimeoutWorker.start();
-
// Ack start.
if (log.isDebugEnabled())
log.debug(startInfo());
@@ -1445,15 +1431,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(idleClientWorker);
U.interrupt(clientFlushWorker);
- U.interrupt(sockTimeoutWorker);
- U.interrupt(recoveryWorker);
+ U.interrupt(commWorker);
- U.join(idleClientWorker, log);
U.join(clientFlushWorker, log);
- U.join(sockTimeoutWorker, log);
- U.join(recoveryWorker, log);
+ U.join(commWorker, log);
// Force closing on stop (safety).
for (GridCommunicationClient client : clients.values())
@@ -1461,7 +1443,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Clear resources.
nioSrvr = null;
- idleClientWorker = null;
+ commWorker = null;
boundTcpPort = -1;
@@ -1899,7 +1881,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
- sockTimeoutWorker.addTimeoutObject(obj);
+ addTimeoutObject(obj);
long rcvCnt = 0;
@@ -2005,7 +1987,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
boolean cancelled = obj.cancel();
if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
+ removeTimeoutObject(obj);
// Ignoring whatever happened after timeout - reporting only timeout event.
if (!cancelled)
@@ -2041,15 +2023,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(idleClientWorker);
U.interrupt(clientFlushWorker);
- U.interrupt(sockTimeoutWorker);
- U.interrupt(recoveryWorker);
+ U.interrupt(commWorker);
- U.join(idleClientWorker, log);
U.join(clientFlushWorker, log);
- U.join(sockTimeoutWorker, log);
- U.join(recoveryWorker, log);
+ U.join(commWorker, log);
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -2156,119 +2134,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
- private class IdleClientWorker extends IgniteSpiThread {
- /**
- *
- */
- IdleClientWorker() {
- super(gridName, "nio-idle-client-collector", log);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"BusyWait"})
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- cleanupRecovery();
-
- for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
- UUID nodeId = e.getKey();
-
- GridCommunicationClient client = e.getValue();
-
- ClusterNode node = getSpiContext().node(nodeId);
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Forcing close of non-existent node connection: " + nodeId);
-
- client.forceClose();
-
- clients.remove(nodeId, client);
-
- continue;
- }
-
- GridNioRecoveryDescriptor recovery = null;
-
- if (client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
-
- if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
- RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
-
- if (log.isDebugEnabled())
- log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
- ", rcvCnt=" + msg.received() + ']');
-
- nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
-
- recovery.lastAcknowledged(msg.received());
-
- continue;
- }
- }
-
- long idleTime = client.getIdleTime();
-
- if (idleTime >= idleConnTimeout) {
- if (recovery != null &&
- recovery.nodeAlive(getSpiContext().node(nodeId)) &&
- !recovery.messagesFutures().isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Node connection is idle, but there are unacknowledged messages, " +
- "will wait: " + nodeId);
-
- continue;
- }
-
- if (log.isDebugEnabled())
- log.debug("Closing idle node connection: " + nodeId);
-
- if (client.close() || client.closed())
- clients.remove(nodeId, client);
- }
- }
-
- Thread.sleep(idleConnTimeout);
- }
- }
-
- /**
- *
- */
- private void cleanupRecovery() {
- Set<ClientKey> left = null;
-
- for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
- if (left != null && left.contains(e.getKey()))
- continue;
-
- GridNioRecoveryDescriptor recoverySnd = e.getValue();
-
- if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
- if (left == null)
- left = new HashSet<>();
-
- left.add(e.getKey());
- }
- }
-
- if (left != null) {
- assert !left.isEmpty();
-
- for (ClientKey id : left) {
- GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
-
- if (recoverySnd != null)
- recoverySnd.onNodeLeft();
- }
- }
- }
- }
-
- /**
- *
- */
private class ClientFlushWorker extends IgniteSpiThread {
/**
*
@@ -2319,157 +2184,164 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
- * Handles sockets timeouts.
+ *
*/
- private class SocketTimeoutWorker extends IgniteSpiThread {
- /** Time-based sorted set for timeout objects. */
- private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs =
- new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() {
- @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) {
- long time1 = o1.endTime();
- long time2 = o2.endTime();
-
- long id1 = o1.id();
- long id2 = o2.id();
-
- return time1 < time2 ? -1 : time1 > time2 ? 1 :
- id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
- }
- });
-
- /** Mutex. */
- private final Object mux0 = new Object();
+ private class CommunicationWorker extends IgniteSpiThread {
+ /** */
+ private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
/**
*
*/
- SocketTimeoutWorker() {
- super(gridName, "tcp-comm-sock-timeout-worker", log);
+ private CommunicationWorker() {
+ super(gridName, "tcp-comm-worker", log);
}
- /**
- * @param timeoutObj Timeout object to add.
- */
- @SuppressWarnings({"NakedNotify"})
- public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) {
- assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Tcp communication worker has been started.");
- timeoutObjs.add(timeoutObj);
+ while (!isInterrupted()) {
+ GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
- if (timeoutObjs.firstx() == timeoutObj) {
- synchronized (mux0) {
- mux0.notifyAll();
- }
+ if (recoveryDesc != null)
+ processRecovery(recoveryDesc);
+ else
+ processIdle();
}
}
/**
- * @param timeoutObj Timeout object to remove.
+ *
*/
- public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) {
- assert timeoutObj != null;
+ private void processIdle() {
+ cleanupRecovery();
- timeoutObjs.remove(timeoutObj);
- }
+ for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+ UUID nodeId = e.getKey();
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Socket timeout worker has been started.");
+ GridCommunicationClient client = e.getValue();
- while (!isInterrupted()) {
- long now = U.currentTimeMillis();
+ ClusterNode node = getSpiContext().node(nodeId);
- for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
- HandshakeTimeoutObject timeoutObj = iter.next();
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing close of non-existent node connection: " + nodeId);
+
+ client.forceClose();
+
+ clients.remove(nodeId, client);
+
+ continue;
+ }
- if (timeoutObj.endTime() <= now) {
- iter.remove();
+ GridNioRecoveryDescriptor recovery = null;
- timeoutObj.onTimeout();
+ if (client instanceof GridTcpNioCommunicationClient) {
+ recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+ if (log.isDebugEnabled())
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+ ", rcvCnt=" + msg.received() + ']');
+
+ nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+
+ recovery.lastAcknowledged(msg.received());
+
+ continue;
}
- else
- break;
}
- synchronized (mux0) {
- while (true) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTimeoutObject(..)' method.
- HandshakeTimeoutObject first = timeoutObjs.firstx();
+ long idleTime = client.getIdleTime();
- if (first != null) {
- long waitTime = first.endTime() - U.currentTimeMillis();
+ if (idleTime >= idleConnTimeout) {
+ if (recovery != null &&
+ recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+ !recovery.messagesFutures().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Node connection is idle, but there are unacknowledged messages, " +
+ "will wait: " + nodeId);
- if (waitTime > 0)
- mux0.wait(waitTime);
- else
- break;
- }
- else
- mux0.wait(5000);
+ continue;
}
+
+ if (log.isDebugEnabled())
+ log.debug("Closing idle node connection: " + nodeId);
+
+ if (client.close() || client.closed())
+ clients.remove(nodeId, client);
}
}
}
- }
-
- /**
- *
- */
- private class RecoveryWorker extends IgniteSpiThread {
- /** */
- private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
/**
*
*/
- private RecoveryWorker() {
- super(gridName, "tcp-comm-recovery-worker", log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Recovery worker has been started.");
+ private void cleanupRecovery() {
+ Set<ClientKey> left = null;
- while (!isInterrupted()) {
- GridNioRecoveryDescriptor recoveryDesc = q.take();
+ for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+ if (left != null && left.contains(e.getKey()))
+ continue;
- assert recoveryDesc != null;
+ GridNioRecoveryDescriptor recoverySnd = e.getValue();
- ClusterNode node = recoveryDesc.node();
+ if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+ if (left == null)
+ left = new HashSet<>();
- if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
- continue;
+ left.add(e.getKey());
+ }
+ }
- try {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+ if (left != null) {
+ assert !left.isEmpty();
- GridCommunicationClient client = reserveClient(node);
+ for (ClientKey id : left) {
+ GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
- client.release();
+ if (recoverySnd != null)
+ recoverySnd.onNodeLeft();
}
- catch (IgniteCheckedException | IgniteException e) {
- if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, will retry " +
- "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ }
+ }
- addReconnectRequest(recoveryDesc);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, " +
- "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ /**
+ * @param recoveryDesc Recovery descriptor.
+ */
+ private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
+ ClusterNode node = recoveryDesc.node();
- onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
- e);
- }
+ if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ return;
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+ GridCommunicationClient client = reserveClient(node);
+
+ client.release();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, will retry " +
+ "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+ addReconnectRequest(recoveryDesc);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, " +
+ "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+ onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+ e);
}
}
}
@@ -2497,12 +2369,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
- private static class HandshakeTimeoutObject<T> {
- /** */
- private static final AtomicLong idGen = new AtomicLong();
-
+ private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
/** */
- private final long id = idGen.incrementAndGet();
+ private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final T obj;
@@ -2533,34 +2402,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return done.compareAndSet(false, true);
}
- /**
- * @return {@code True} if object has not yet been canceled.
- */
- boolean onTimeout() {
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
if (obj instanceof GridCommunicationClient)
((GridCommunicationClient)obj).forceClose();
else
U.closeQuiet((AbstractInterruptibleChannel)obj);
-
- return true;
}
-
- return false;
}
- /**
- * @return End time.
- */
- long endTime() {
+ /** {@inheritDoc} */
+ @Override public long endTime() {
return endTime;
}
- /**
- * @return ID.
- */
- long id() {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e672d64..5b66019 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -116,7 +116,6 @@ class ClientImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
- b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
b.append(U.nl());
@@ -524,11 +523,9 @@ class ClientImpl extends TcpDiscoveryImpl {
U.interrupt(sockWriter);
U.interrupt(msgWorker);
- U.interrupt(spi.sockTimeoutWorker);
U.join(sockWriter, log);
U.join(msgWorker, log);
- U.join(spi.sockTimeoutWorker, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 57c13d6..485f57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1412,7 +1412,6 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
- b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 56fb63f..8365716 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -255,9 +255,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Internal and external addresses of local node. */
protected Collection<InetSocketAddress> locNodeAddrs;
- /** Socket timeout worker. */
- protected SocketTimeoutWorker sockTimeoutWorker;
-
/** Start time of the very first grid node. */
protected volatile long gridStartTime;
@@ -1118,7 +1115,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- sockTimeoutWorker.addTimeoutObject(obj);
+ addTimeoutObject(obj);
IOException err = null;
@@ -1136,7 +1133,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
boolean cancelled = obj.cancel();
if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
+ removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
@@ -1180,7 +1177,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- sockTimeoutWorker.addTimeoutObject(obj);
+ addTimeoutObject(obj);
IOException err = null;
@@ -1198,7 +1195,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
boolean cancelled = obj.cancel();
if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
+ removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
@@ -1222,7 +1219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- sockTimeoutWorker.addTimeoutObject(obj);
+ addTimeoutObject(obj);
OutputStream out = sock.getOutputStream();
@@ -1240,7 +1237,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
boolean cancelled = obj.cancel();
if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
+ removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
@@ -1599,9 +1596,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
mcastIpFinder.setLocalAddress(locAddr);
}
- sockTimeoutWorker = new SocketTimeoutWorker();
- sockTimeoutWorker.start();
-
impl.spiStart(gridName);
}
@@ -1611,9 +1605,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
// Safety.
ctxInitLatch.countDown();
- U.interrupt(sockTimeoutWorker);
- U.join(sockTimeoutWorker, log);
-
if (ipFinder != null) {
try {
ipFinder.close();
@@ -1754,117 +1745,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
- * Handles sockets timeouts.
- */
- protected class SocketTimeoutWorker extends IgniteSpiThread {
- /** Time-based sorted set for timeout objects. */
- private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
- new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
- @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
- int res = Long.compare(o1.endTime(), o2.endTime());
-
- if (res != 0)
- return res;
-
- return Long.compare(o1.id(), o2.id());
- }
- });
-
- /** Mutex. */
- private final Object mux0 = new Object();
-
- /**
- *
- */
- SocketTimeoutWorker() {
- super(gridName, "tcp-disco-sock-timeout-worker", log);
-
- setPriority(threadPri);
- }
-
- /**
- * @param timeoutObj Timeout object to add.
- */
- @SuppressWarnings({"NakedNotify"})
- public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
- assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
- timeoutObjs.add(timeoutObj);
-
- if (timeoutObjs.firstx() == timeoutObj) {
- synchronized (mux0) {
- mux0.notifyAll();
- }
- }
- }
-
- /**
- * @param timeoutObj Timeout object to remove.
- */
- public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
- assert timeoutObj != null;
-
- timeoutObjs.remove(timeoutObj);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Socket timeout worker has been started.");
-
- while (!isInterrupted()) {
- long now = U.currentTimeMillis();
-
- for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
- SocketTimeoutObject timeoutObj = iter.next();
-
- if (timeoutObj.endTime() <= now) {
- iter.remove();
-
- if (timeoutObj.onTimeout()) {
- LT.warn(log, null, "Socket write has timed out (consider increasing " +
- "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
-
- stats.onSocketTimeout();
- }
- }
- else
- break;
- }
-
- synchronized (mux0) {
- while (true) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTimeoutObject(..)' method.
- SocketTimeoutObject first = timeoutObjs.firstx();
-
- if (first != null) {
- long waitTime = first.endTime() - U.currentTimeMillis();
-
- if (waitTime > 0)
- mux0.wait(waitTime);
- else
- break;
- }
- else
- mux0.wait(5000);
- }
- }
- }
- }
- }
-
- /**
* Socket timeout object.
*/
- private static class SocketTimeoutObject {
+ private class SocketTimeoutObject implements IgniteSpiTimeoutObject {
/** */
- private static final AtomicLong idGen = new AtomicLong();
-
- /** */
- private final long id = idGen.incrementAndGet();
+ private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final Socket sock;
@@ -1894,31 +1779,26 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return done.compareAndSet(false, true);
}
- /**
- * @return {@code True} if object has not yet been canceled.
- */
- boolean onTimeout() {
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
U.closeQuiet(sock);
- return true;
- }
+ LT.warn(log, null, "Socket write has timed out (consider increasing " +
+ "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
- return false;
+ stats.onSocketTimeout();
+ }
}
- /**
- * @return End time.
- */
- long endTime() {
+ /** {@inheritDoc} */
+ @Override public long endTime() {
return endTime;
}
- /**
- * @return ID.
- */
- long id() {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 0f2a898..80e6123 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -28,6 +28,7 @@ import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
@@ -258,6 +259,107 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
checkRemovedLatch(latch);
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLatchMultinode1() throws Exception {
+ if (gridCount() == 1)
+ return;
+
+ IgniteCountDownLatch latch = grid(0).countDownLatch("l1", 10,
+ true,
+ true);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ final AtomicBoolean countedDown = new AtomicBoolean();
+
+ for (int i = 0; i < gridCount(); i++) {
+ final Ignite ignite = grid(i);
+
+ futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteCountDownLatch latch = ignite.countDownLatch("l1", 10,
+ true,
+ false);
+
+ assertNotNull(latch);
+
+ boolean wait = latch.await(30_000);
+
+ assertTrue(countedDown.get());
+
+ assertEquals(0, latch.count());
+
+ assertTrue(wait);
+
+ return null;
+ }
+ }));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ if (i == 9)
+ countedDown.set(true);
+
+ latch.countDown();
+ }
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(30_000);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLatchMultinode2() throws Exception {
+ if (gridCount() == 1)
+ return;
+
+ IgniteCountDownLatch latch = grid(0).countDownLatch("l2", gridCount() * 3,
+ true,
+ true);
+
+ assertNotNull(latch);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ final AtomicInteger cnt = new AtomicInteger();
+
+ for (int i = 0; i < gridCount(); i++) {
+ final Ignite ignite = grid(i);
+
+ futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteCountDownLatch latch = ignite.countDownLatch("l2", 10,
+ true,
+ false);
+
+ assertNotNull(latch);
+
+ for (int i = 0; i < 3; i++) {
+ cnt.incrementAndGet();
+
+ latch.countDown();
+ }
+
+ boolean wait = latch.await(30_000);
+
+ assertEquals(gridCount() * 3, cnt.get());
+
+ assertEquals(0, latch.count());
+
+ assertTrue(wait);
+
+ return null;
+ }
+ }));
+ }
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(30_000);
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// No-op.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
new file mode 100644
index 0000000..602ac18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.expiry.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNearCacheExpiryTest extends IgniteCacheAbstractTest {
+ /** */
+ private static final int NODES = 3;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return NODES;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.equals(getTestGridName(NODES - 1)))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExpirationOnClient() throws Exception {
+ Ignite ignite = grid(NODES - 1);
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ assertTrue(((IgniteCacheProxy)cache).context().isNear());
+
+ for (int i = 0 ; i < 100; i++)
+ cache.put(i, i);
+
+ CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 500));
+
+ IgniteCache<Object, Object> cacheWithExpiry = cache.withExpiryPolicy(plc);
+
+ for (int i = 100 ; i < 200; i++) {
+ cacheWithExpiry.put(i, i);
+
+ assertEquals(i, cacheWithExpiry.localPeek(i));
+ }
+
+ U.sleep(1000);
+
+ for (int i = 0 ; i < 100; i++)
+ assertEquals(i, cacheWithExpiry.localPeek(i));
+
+ for (int i = 100 ; i < 200; i++)
+ assertNull(cache.localPeek(i));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index c006f69..c78ec5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -50,6 +50,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTtlCleanupSelfTest.class);
+ suite.addTestSuite(IgniteCacheClientNearCacheExpiryTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 5867fb8..21f9424 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -501,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
return false;
}
+ /** {@inheritDoc} */
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.