You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/10/12 12:32:15 UTC
[4/6] ignite git commit: Merge master into ignite-843
Merge master into ignite-843
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae09fa9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae09fa9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae09fa9f
Branch: refs/heads/ignite-843
Commit: ae09fa9f26b9171636f22cdc11c61a77ad3cd4f4
Parents: 49514e8 1223525
Author: Andrey <an...@gridgain.com>
Authored: Mon Oct 12 16:28:55 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Mon Oct 12 16:29:06 2015 +0700
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 479 ++++++------
.../processors/cache/GridCacheProcessor.java | 157 ++--
.../cache/query/GridCacheQueryManager.java | 719 +++++++++----------
.../processors/rest/GridRestProcessor.java | 176 +++--
.../handlers/cache/GridCacheCommandHandler.java | 313 ++++----
.../handlers/query/QueryCommandHandler.java | 156 ++--
.../top/GridTopologyCommandHandler.java | 18 +-
.../rest/request/RestQueryRequest.java | 26 +-
.../http/jetty/GridJettyRestHandler.java | 151 ++--
9 files changed, 1055 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6aba211,9e54f6f..b74fbdb
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -134,33 -134,33 +134,24 @@@ import static org.apache.ignite.plugin.
* Discovery SPI manager.
*/
public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
++ /** Discovery cached history size. */
++ protected static final int DISCOVERY_HISTORY_SIZE = 100;
/** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */
private static final String NULL_CACHE_NAME = UUID.randomUUID().toString();
--
/** Metrics update frequency. */
private static final long METRICS_UPDATE_FREQ = 3000;
--
/** */
private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
--
/** */
private static final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
--
/** */
private static final RuntimeMXBean rt = ManagementFactory.getRuntimeMXBean();
--
/** */
private static final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
--
/** */
private static final Collection<GarbageCollectorMXBean> gc = ManagementFactory.getGarbageCollectorMXBeans();
--
/** */
private static final String PREFIX = "Topology snapshot";
--
-- /** Discovery cached history size. */
-- protected static final int DISCOVERY_HISTORY_SIZE = 100;
--
/** Predicate filtering out daemon nodes. */
private static final IgnitePredicate<ClusterNode> daemonFilter = new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
@@@ -185,88 -185,88 +176,83 @@@
/** Discovery event worker. */
private final DiscoveryWorker discoWrk = new DiscoveryWorker();
--
++ /** Last logged topology. */
++ private final AtomicLong lastLoggedTop = new AtomicLong();
++ /** Last segment check result. */
++ private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
++ /** Topology cache history. */
++ private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
++ new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
++ /** Topology version. */
++ private final AtomicReference<Snapshot> topSnap =
++ new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null));
++ /** */
++ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
++ /** Received custom messages history. */
++ private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
++ /** */
++ private final CountDownLatch startLatch = new CountDownLatch(1);
/** Network segment check worker. */
private SegmentCheckWorker segChkWrk;
--
/** Network segment check thread. */
private IgniteThread segChkThread;
--
-- /** Last logged topology. */
-- private final AtomicLong lastLoggedTop = new AtomicLong();
--
/** Local node. */
private ClusterNode locNode;
--
/** Local node daemon flag. */
private boolean isLocDaemon;
--
/** {@code True} if resolvers were configured and network segment check is enabled. */
private boolean hasRslvrs;
--
-- /** Last segment check result. */
-- private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
--
-- /** Topology cache history. */
-- private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
-- new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
--
/** Topology snapshots history. */
private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
--
-- /** Topology version. */
-- private final AtomicReference<Snapshot> topSnap =
-- new AtomicReference<>(new Snapshot(AffinityTopologyVersion.ZERO, null));
--
/** Minor topology version. */
private int minorTopVer;
--
/** Order supported flag. */
private boolean discoOrdered;
--
/** Topology snapshots history supported flag. */
private boolean histSupported;
--
/** Configured network segment check frequency. */
private long segChkFreq;
--
/** Local node join to topology event. */
private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>();
--
/** GC CPU load. */
private volatile double gcCpuLoad;
--
/** CPU load. */
private volatile double cpuLoad;
--
/** Metrics. */
private final GridLocalMetrics metrics = createMetrics();
--
/** Metrics update worker. */
private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
--
/** Custom event listener. */
private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
new ConcurrentHashMap8<>();
--
/** Map of dynamic cache filters. */
private Map<String, CachePredicate> registeredCaches = new HashMap<>();
-- /** */
-- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
--
-- /** Received custom messages history. */
-- private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
--
-- /** */
-- private final CountDownLatch startLatch = new CountDownLatch(1);
--
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
}
/**
++ * @param nodes Nodes.
++ * @return Total CPUs.
++ */
++ private static int cpus(Collection<ClusterNode> nodes) {
++ Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f);
++
++ int cpus = 0;
++
++ for (ClusterNode n : nodes) {
++ String macs = n.attribute(ATTR_MACS);
++
++ if (macSet.add(macs))
++ cpus += n.metrics().getTotalCpus();
++ }
++
++ return cpus;
++ }
++
++ /**
* @return Memory usage of non-heap memory.
*/
private MemoryUsage nonHeapMemoryUsage() {
@@@ -1063,25 -1063,25 +1049,6 @@@
}
/**
-- * @param nodes Nodes.
-- * @return Total CPUs.
-- */
-- private static int cpus(Collection<ClusterNode> nodes) {
-- Collection<String> macSet = new HashSet<>(nodes.size(), 1.0f);
--
-- int cpus = 0;
--
-- for (ClusterNode n : nodes) {
-- String macs = n.attribute(ATTR_MACS);
--
-- if (macSet.add(macs))
-- cpus += n.metrics().getTotalCpus();
-- }
--
-- return cpus;
-- }
--
-- /**
* Prints the latest topology info into log taking into account logging/verbosity settings.
*/
public void ackTopology() {
@@@ -1812,105 -1812,105 +1779,291 @@@
).start();
}
- /** Worker for network segment checks. */
- private class SegmentCheckWorker extends GridWorker {
++ /** Discovery topology future. */
++ private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener {
+ /** */
- private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
++ private static final long serialVersionUID = 0L;
+
- /**
- *
- */
- private SegmentCheckWorker() {
- super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log);
++ /** */
++ private GridKernalContext ctx;
+
- assert hasRslvrs;
- assert segChkFreq > 0;
++ /** Topology await version. */
++ private long awaitVer;
++
++ /** Empty constructor required by {@link Externalizable}. */
++ private DiscoTopologyFuture() {
++ // No-op.
+ }
+
+ /**
- *
++ * @param ctx Context.
++ * @param awaitVer Await version.
+ */
- public void scheduleSegmentCheck() {
- queue.add(new Object());
++ private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) {
++ this.ctx = ctx;
++ this.awaitVer = awaitVer;
+ }
+
- /** {@inheritDoc} */
- @SuppressWarnings("StatementWithEmptyBody")
- @Override protected void body() throws InterruptedException {
- long lastChk = 0;
++ /** Initializes future. */
++ private void init() {
++ ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
- while (!isCancelled()) {
- Object req = queue.poll(2000, MILLISECONDS);
++ // Close potential window.
++ long topVer = ctx.discovery().topologyVersion();
+
- long now = U.currentTimeMillis();
++ if (topVer >= awaitVer)
++ onDone(topVer);
++ }
+
- // Check frequency if segment check has not been requested.
- if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) {
- if (log.isDebugEnabled())
- log.debug("Skipping segment check as it has not been requested and it is not time to check.");
++ /** {@inheritDoc} */
++ @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
++ if (super.onDone(res, err)) {
++ ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
- continue;
- }
++ return true;
++ }
+
- // We should always check segment if it has been explicitly
- // requested (on any node failure or leave).
- assert req != null || lastChk + segChkFreq < now;
++ return false;
++ }
+
- // Drain queue.
- while (queue.poll() != null) {
- // No-op.
- }
++ /** {@inheritDoc} */
++ @Override public void onEvent(Event evt) {
++ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
- if (lastSegChkRes.get()) {
- boolean segValid = ctx.segmentation().isValidSegment();
++ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
- lastChk = now;
++ if (discoEvt.topologyVersion() >= awaitVer)
++ onDone(discoEvt.topologyVersion());
++ }
++ }
+
- if (!segValid) {
- discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
- Collections.<ClusterNode>emptyList(), null);
++ /**
++ *
++ */
++ private static class Snapshot {
++ /** */
++ private final AffinityTopologyVersion topVer;
+
- lastSegChkRes.set(false);
- }
++ /** */
++ @GridToStringExclude
++ private final DiscoCache discoCache;
+
- if (log.isDebugEnabled())
- log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']');
- }
- }
++ /**
++ * @param topVer Topology version.
++ * @param discoCache Disco cache.
++ */
++ private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
++ this.topVer = topVer;
++ this.discoCache = discoCache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
- return S.toString(SegmentCheckWorker.class, this);
++ return S.toString(Snapshot.class, this);
+ }
+ }
+
- /** Worker for discovery events. */
- private class DiscoveryWorker extends GridWorker {
- /** Event queue. */
- private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
- DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
++ /**
++ * Cache predicate.
++ */
++ private static class CachePredicate {
++ /** Cache filter. */
++ private final IgnitePredicate<ClusterNode> cacheFilter;
+
- /** Node segmented event fired flag. */
- private boolean nodeSegFired;
++ /** If near cache is enabled on data nodes. */
++ private final boolean nearEnabled;
++
++ /** Cache mode. */
++ private final CacheMode cacheMode;
++
++ /** Collection of client near nodes. */
++ private final ConcurrentHashMap<UUID, Boolean> clientNodes;
+
+ /**
- *
++ * @param cacheFilter Cache filter.
++ * @param nearEnabled Near enabled flag.
++ * @param cacheMode Cache mode.
+ */
- private DiscoveryWorker() {
- super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log);
++ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
++ assert cacheFilter != null;
++
++ this.cacheFilter = cacheFilter;
++ this.nearEnabled = nearEnabled;
++ this.cacheMode = cacheMode;
++
++ clientNodes = new ConcurrentHashMap<>();
+ }
+
+ /**
- * Method is called when any discovery event occurs.
- *
- * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
- * @param topVer Topology version.
- * @param node Remote node this event is connected with.
- * @param topSnapshot Topology snapshot.
++ * @param nodeId Near node ID to add.
++ * @param nearEnabled Near enabled flag.
++ * @return {@code True} if new node ID was added.
+ */
- @SuppressWarnings("RedundantTypeArguments")
- private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
++ public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
++ assert nodeId != null;
++
++ Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
++
++ return old == null;
++ }
++
++ /**
++ * @param leftNodeId Left node ID.
++ * @return {@code True} if existing node ID was removed.
++ */
++ public boolean onNodeLeft(UUID leftNodeId) {
++ assert leftNodeId != null;
++
++ Boolean old = clientNodes.remove(leftNodeId);
++
++ return old != null;
++ }
++
++ /**
++ * @param node Node to check.
++ * @return {@code True} if this node is a data node for given cache.
++ */
++ public boolean dataNode(ClusterNode node) {
++ return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
++ }
++
++ /**
++ * @param node Node to check.
++ * @return {@code True} if cache is accessible on the given node.
++ */
++ public boolean cacheNode(ClusterNode node) {
++ return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
++ }
++
++ /**
++ * @param node Node to check.
++ * @return {@code True} if near cache is present on the given nodes.
++ */
++ public boolean nearNode(ClusterNode node) {
++ if (node.isDaemon())
++ return false;
++
++ if (CU.affinityNode(node, cacheFilter))
++ return nearEnabled;
++
++ Boolean near = clientNodes.get(node.id());
++
++ return near != null && near;
++ }
++
++ /**
++ * @param node Node to check.
++ * @return {@code True} if client cache is present on the given nodes.
++ */
++ public boolean clientNode(ClusterNode node) {
++ if (node.isDaemon())
++ return false;
++
++ Boolean near = clientNodes.get(node.id());
++
++ return near != null && !near;
++ }
++ }
++
+ /** Worker for network segment checks. */
+ private class SegmentCheckWorker extends GridWorker {
+ /** */
+ private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
+
+ /**
+ *
+ */
+ private SegmentCheckWorker() {
+ super(ctx.gridName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log);
+
+ assert hasRslvrs;
+ assert segChkFreq > 0;
+ }
+
+ /**
+ *
+ */
+ public void scheduleSegmentCheck() {
+ queue.add(new Object());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("StatementWithEmptyBody")
+ @Override protected void body() throws InterruptedException {
+ long lastChk = 0;
+
+ while (!isCancelled()) {
+ Object req = queue.poll(2000, MILLISECONDS);
+
+ long now = U.currentTimeMillis();
+
+ // Check frequency if segment check has not been requested.
+ if (req == null && (segChkFreq == 0 || lastChk + segChkFreq >= now)) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping segment check as it has not been requested and it is not time to check.");
+
+ continue;
+ }
+
+ // We should always check segment if it has been explicitly
+ // requested (on any node failure or leave).
+ assert req != null || lastChk + segChkFreq < now;
+
+ // Drain queue.
+ while (queue.poll() != null) {
+ // No-op.
+ }
+
+ if (lastSegChkRes.get()) {
+ boolean segValid = ctx.segmentation().isValidSegment();
+
+ lastChk = now;
+
+ if (!segValid) {
+ discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
+ Collections.<ClusterNode>emptyList(), null);
+
+ lastSegChkRes.set(false);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']');
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SegmentCheckWorker.class, this);
+ }
+ }
+
+ /** Worker for discovery events. */
+ private class DiscoveryWorker extends GridWorker {
+ /** Event queue. */
+ private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+ DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
+
+ /** Node segmented event fired flag. */
+ private boolean nodeSegFired;
+
+ /**
+ *
+ */
+ private DiscoveryWorker() {
+ super(ctx.gridName(), "disco-event-worker", GridDiscoveryManager.this.log);
+ }
+
+ /**
+ * Method is called when any discovery event occurs.
+ *
+ * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
+ * @param topVer Topology version.
+ * @param node Remote node this event is connected with.
+ * @param topSnapshot Topology snapshot.
+ */
+ @SuppressWarnings("RedundantTypeArguments")
+ private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
assert node != null;
if (ctx.event().isRecordable(type)) {
@@@ -2279,90 -2279,90 +2432,6 @@@
}
}
-- /** Discovery topology future. */
-- private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener {
-- /** */
-- private static final long serialVersionUID = 0L;
--
-- /** */
-- private GridKernalContext ctx;
--
-- /** Topology await version. */
-- private long awaitVer;
--
-- /** Empty constructor required by {@link Externalizable}. */
-- private DiscoTopologyFuture() {
-- // No-op.
-- }
--
-- /**
-- * @param ctx Context.
-- * @param awaitVer Await version.
-- */
-- private DiscoTopologyFuture(GridKernalContext ctx, long awaitVer) {
-- this.ctx = ctx;
-- this.awaitVer = awaitVer;
-- }
--
-- /** Initializes future. */
-- private void init() {
-- ctx.event().addLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
--
-- // Close potential window.
-- long topVer = ctx.discovery().topologyVersion();
--
-- if (topVer >= awaitVer)
-- onDone(topVer);
-- }
--
-- /** {@inheritDoc} */
-- @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
-- if (super.onDone(res, err)) {
-- ctx.event().removeLocalEventListener(this, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
--
-- return true;
-- }
--
-- return false;
-- }
--
-- /** {@inheritDoc} */
-- @Override public void onEvent(Event evt) {
-- assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
--
-- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
--
-- if (discoEvt.topologyVersion() >= awaitVer)
-- onDone(discoEvt.topologyVersion());
-- }
-- }
--
-- /**
-- *
-- */
-- private static class Snapshot {
-- /** */
-- private final AffinityTopologyVersion topVer;
--
-- /** */
-- @GridToStringExclude
-- private final DiscoCache discoCache;
--
-- /**
-- * @param topVer Topology version.
-- * @param discoCache Disco cache.
-- */
-- private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
-- this.topVer = topVer;
-- this.discoCache = discoCache;
-- }
--
-- /** {@inheritDoc} */
-- @Override public String toString() {
-- return S.toString(Snapshot.class, this);
-- }
-- }
--
/** Cache for discovery collections. */
private class DiscoCache {
/** Remote nodes. */
@@@ -2841,106 -2841,583 +2910,4 @@@
return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
}
}
--
- /**
- /** Cache for discovery collections. */
- private class DiscoCache {
- /** Remote nodes. */
- private final List<ClusterNode> rmtNodes;
-
- /** All nodes. */
- private final List<ClusterNode> allNodes;
-
- /** All nodes with at least one cache configured. */
- @GridToStringInclude
- private final Collection<ClusterNode> allNodesWithCaches;
-
- /** All nodes with at least one cache configured. */
- @GridToStringInclude
- private final Collection<ClusterNode> rmtNodesWithCaches;
-
- /** Cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> allCacheNodes;
-
- /** Remote cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
-
- /** Cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> affCacheNodes;
-
- /** Caches where at least one node has near cache enabled. */
- @GridToStringInclude
- private final Set<String> nearEnabledCaches;
-
- /** Nodes grouped by version. */
- private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
-
- /** Daemon nodes. */
- private final List<ClusterNode> daemonNodes;
-
- /** Node map. */
- private final Map<UUID, ClusterNode> nodeMap;
-
- /** Local node. */
- private final ClusterNode loc;
-
- /** Highest node order. */
- private final long maxOrder;
-
- /**
- * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
-
- /**
- * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
-
- /**
- * Cached alive remote nodes with caches.
- */
- private final Collection<ClusterNode> aliveNodesWithCaches;
-
- /**
- * Cached alive server remote nodes with caches.
- */
- private final Collection<ClusterNode> aliveSrvNodesWithCaches;
-
- /**
- * Cached alive remote server nodes with caches.
- */
- private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
-
- /**
- * @param loc Local node.
- * @param rmts Remote nodes.
- */
- private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
- this.loc = loc;
-
- rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, daemonFilter)));
-
- assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
- " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
-
- List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1);
-
- if (!loc.isDaemon())
- all.add(loc);
-
- all.addAll(rmtNodes);
-
- Collections.sort(all, GridNodeOrderComparator.INSTANCE);
-
- allNodes = Collections.unmodifiableList(all);
-
- Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
- Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
- Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
-
- aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
- nodesByVer = new TreeMap<>();
-
- long maxOrder0 = 0;
-
- Set<String> nearEnabledSet = new HashSet<>();
-
- for (ClusterNode node : allNodes) {
- assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
-
- if (node.order() > maxOrder0)
- maxOrder0 = node.order();
-
- boolean hasCaches = false;
-
- for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
- String cacheName = entry.getKey();
-
- CachePredicate filter = entry.getValue();
-
- if (filter.cacheNode(node)) {
- nodesWithCaches.add(node);
-
- if (!loc.id().equals(node.id()))
- rmtNodesWithCaches.add(node);
-
- addToMap(cacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveCacheNodes, maskNull(cacheName), node);
-
- if (filter.dataNode(node))
- addToMap(dhtNodesMap, cacheName, node);
-
- if (filter.nearNode(node))
- nearEnabledSet.add(cacheName);
-
- if (!loc.id().equals(node.id())) {
- addToMap(rmtCacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
- }
-
- hasCaches = true;
- }
- }
-
- if (hasCaches) {
- if (alive(node.id())) {
- aliveNodesWithCaches.add(node);
-
- if (!CU.clientNode(node)) {
- aliveSrvNodesWithCaches.add(node);
-
- if (!loc.id().equals(node.id()))
- aliveRmtSrvNodesWithCaches.add(node);
- }
- }
- }
-
- IgniteProductVersion nodeVer = U.productVersion(node);
-
- // Create collection for this version if it does not exist.
- Collection<ClusterNode> nodes = nodesByVer.get(nodeVer);
-
- if (nodes == null) {
- nodes = new ArrayList<>(allNodes.size());
-
- nodesByVer.put(nodeVer, nodes);
- }
-
- nodes.add(node);
- }
-
- // Need second iteration to add this node to all previous node versions.
- for (ClusterNode node : allNodes) {
- IgniteProductVersion nodeVer = U.productVersion(node);
-
- // Get all versions lower or equal node's version.
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView =
- nodesByVer.headMap(nodeVer, false);
-
- for (Collection<ClusterNode> prevVersions : updateView.values())
- prevVersions.add(node);
- }
-
- maxOrder = maxOrder0;
-
- allCacheNodes = Collections.unmodifiableMap(cacheMap);
- rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
- affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
- allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
- this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
- nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
-
- daemonNodes = Collections.unmodifiableList(new ArrayList<>(
- F.view(F.concat(false, loc, rmts), F0.not(daemonFilter))));
-
- Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
-
- for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
- nodeMap.put(n.id(), n);
-
- this.nodeMap = nodeMap;
- }
-
- /**
- * Adds node to map.
- *
- * @param cacheMap Map to add to.
- * @param cacheName Cache name.
- * @param rich Node to add
- */
- private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
- Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
-
- if (cacheNodes == null) {
- cacheNodes = new ArrayList<>(allNodes.size());
-
- cacheMap.put(cacheName, cacheNodes);
- }
-
- cacheNodes.add(rich);
- }
-
- /** @return Local node. */
- ClusterNode localNode() {
- return loc;
- }
-
- /** @return Remote nodes. */
- Collection<ClusterNode> remoteNodes() {
- return rmtNodes;
- }
-
- /** @return All nodes. */
- Collection<ClusterNode> allNodes() {
- return allNodes;
- }
-
- /**
- * Gets collection of nodes which have version equal or greater than {@code ver}.
- *
- * @param ver Version to check.
- * @return Collection of nodes with version equal or greater than {@code ver}.
- */
- Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
- Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
-
- if (entry == null)
- return Collections.emptyList();
-
- return entry.getValue();
- }
-
- /**
- * @return Versions map.
- */
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
- return nodesByVer;
- }
-
- /**
- * Gets collection of nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of nodes.
- */
- Collection<ClusterNode> allNodesWithCaches(final long topVer) {
- return filter(topVer, allNodesWithCaches);
- }
-
- /**
- * Gets all nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, allCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, rmtCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all remote nodes that have at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(final long topVer) {
- return filter(topVer, rmtNodesWithCaches);
- }
-
- /**
- * Gets all nodes that have cache with given name and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, affCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all alive nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
- }
-
- /**
- * Gets all alive remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
- }
-
- /**
- * Gets all alive remote server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveRmtSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive remote nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
- return filter(topVer, aliveNodesWithCaches);
- }
-
- /**
- * Checks if cache with given name has at least one node with near cache enabled.
- *
- * @param cacheName Cache name.
- * @return {@code True} if cache with given name has at least one node with near cache enabled.
- */
- boolean hasNearCache(@Nullable String cacheName) {
- return nearEnabledCaches.contains(cacheName);
- }
-
- /**
- * Removes left node from cached alives lists.
- *
- * @param leftNode Left node.
- */
- void updateAlives(ClusterNode leftNode) {
- if (leftNode.order() > maxOrder)
- return;
-
- filterNodeMap(aliveCacheNodes, leftNode);
-
- filterNodeMap(aliveRmtCacheNodes, leftNode);
-
- aliveNodesWithCaches.remove(leftNode);
- aliveSrvNodesWithCaches.remove(leftNode);
- aliveRmtSrvNodesWithCaches.remove(leftNode);
- }
-
- /**
- * Creates a copy of nodes map without the given node.
- *
- * @param map Map to copy.
- * @param exclNode Node to exclude.
- */
- private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
- for (String cacheName : registeredCaches.keySet()) {
- String maskedName = maskNull(cacheName);
-
- while (true) {
- Collection<ClusterNode> oldNodes = map.get(maskedName);
-
- if (oldNodes == null || oldNodes.isEmpty())
- break;
-
- Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
-
- if (!newNodes.remove(exclNode))
- break;
-
- if (map.replace(maskedName, oldNodes, newNodes))
- break;
- }
- }
- }
-
- /**
- * Replaces {@code null} with {@code NULL_CACHE_NAME}.
- *
- * @param cacheName Cache name.
- * @return Masked name.
- */
- private String maskNull(@Nullable String cacheName) {
- return cacheName == null ? NULL_CACHE_NAME : cacheName;
- }
-
- /**
- * @param topVer Topology version.
- * @param nodes Nodes.
- * @return Filtered collection (potentially empty, but never {@code null}).
- */
- private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) {
- if (nodes == null)
- return Collections.emptyList();
-
- // If no filtering needed, return original collection.
- return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ?
- nodes :
- F.view(nodes, new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.order() <= topVer;
- }
- });
- }
-
- /** @return Daemon nodes. */
- Collection<ClusterNode> daemonNodes() {
- return daemonNodes;
- }
-
- /**
- * @param id Node ID.
- * @return Node.
- */
- @Nullable ClusterNode node(UUID id) {
- return nodeMap.get(id);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
- }
- } /**
-- * Cache predicate.
-- */
-- private static class CachePredicate {
-- /** Cache filter. */
-- private final IgnitePredicate<ClusterNode> cacheFilter;
--
-- /** If near cache is enabled on data nodes. */
-- private final boolean nearEnabled;
--
-- /** Cache mode. */
-- private final CacheMode cacheMode;
--
-- /** Collection of client near nodes. */
-- private final ConcurrentHashMap<UUID, Boolean> clientNodes;
--
-- /**
-- * @param cacheFilter Cache filter.
-- * @param nearEnabled Near enabled flag.
-- * @param cacheMode Cache mode.
-- */
-- private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
-- assert cacheFilter != null;
--
-- this.cacheFilter = cacheFilter;
-- this.nearEnabled = nearEnabled;
-- this.cacheMode = cacheMode;
--
-- clientNodes = new ConcurrentHashMap<>();
-- }
--
-- /**
-- * @param nodeId Near node ID to add.
-- * @param nearEnabled Near enabled flag.
-- * @return {@code True} if new node ID was added.
-- */
-- public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
-- assert nodeId != null;
--
-- Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
--
-- return old == null;
-- }
--
-- /**
-- * @param leftNodeId Left node ID.
-- * @return {@code True} if existing node ID was removed.
-- */
-- public boolean onNodeLeft(UUID leftNodeId) {
-- assert leftNodeId != null;
--
-- Boolean old = clientNodes.remove(leftNodeId);
--
-- return old != null;
-- }
--
-- /**
-- * @param node Node to check.
-- * @return {@code True} if this node is a data node for given cache.
-- */
-- public boolean dataNode(ClusterNode node) {
-- return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
-- }
--
-- /**
-- * @param node Node to check.
-- * @return {@code True} if cache is accessible on the given node.
-- */
-- public boolean cacheNode(ClusterNode node) {
-- return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
-- }
--
-- /**
-- * @param node Node to check.
-- * @return {@code True} if near cache is present on the given nodes.
-- */
-- public boolean nearNode(ClusterNode node) {
-- if (node.isDaemon())
-- return false;
--
-- if (CU.affinityNode(node, cacheFilter))
-- return nearEnabled;
--
-- Boolean near = clientNodes.get(node.id());
--
-- return near != null && near;
-- }
--
-- /**
-- * @param node Node to check.
-- * @return {@code True} if client cache is present on the given nodes.
-- */
-- public boolean clientNode(ClusterNode node) {
-- if (node.isDaemon())
-- return false;
--
-- Boolean near = clientNodes.get(node.id());
--
-- return near != null && !near;
-- }
-- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8c96c0c,736e630..a9919f8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -149,52 -149,52 +149,36 @@@ import static org.apache.ignite.transac
public class GridCacheProcessor extends GridProcessorAdapter {
/** Null cache name. */
private static final String NULL_NAME = U.id8(UUID.randomUUID());
--
-- /** Shared cache context. */
-- private GridCacheSharedContext<?, ?> sharedCtx;
--
/** */
private final Map<String, GridCacheAdapter<?, ?>> caches;
--
/** Caches stopped from onKernalStop callback. */
private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();
--
/** Map of proxies. */
private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
--
/** Map of preload finish futures grouped by preload order. */
private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
--
-- /** Maximum detected rebalance order. */
-- private int maxRebalanceOrder;
--
/** Caches stop sequence. */
private final Deque<String> stopSeq;
--
++ /** Count down latch for caches. */
++ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
++ /** Shared cache context. */
++ private GridCacheSharedContext<?, ?> sharedCtx;
++ /** Maximum detected rebalance order. */
++ private int maxRebalanceOrder;
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;
--
/** Pending cache starts. */
private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
--
/** Template configuration add futures. */
private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
--
/** Dynamic caches. */
private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
--
/** Cache templates. */
private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
--
/** */
private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
--
/** Must use JDK marshaller since it is used by discovery to fire custom events. */
private Marshaller marshaller = new JdkMarshaller();
--
-- /** Count down latch for caches. */
-- private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
--
/** */
private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
@@@ -215,6 -215,6 +199,24 @@@
}
/**
++ * @param name Name to mask.
++ * @return Masked name.
++ */
++ private static String maskNull(String name) {
++ return name == null ? NULL_NAME : name;
++ }
++
++ /**
++ * @param name Name to unmask.
++ * @return Unmasked name.
++ */
++ @SuppressWarnings("StringEquality")
++ private static String unmaskNull(String name) {
++ // Intentional identity equality.
++ return name == NULL_NAME ? null : name;
++ }
++
++ /**
* @param internalCache Internal cache flag.
* @param cfg Initializes cache configuration with proper defaults.
* @param cacheObjCtx Cache object context.
@@@ -2261,7 -2261,7 +2263,6 @@@
return F.first(initiateCacheChanges(F.asList(t), false));
}
--
/**
* @param cacheName Cache name to close.
* @return Future that will be completed when cache is closed.
@@@ -3365,21 -3365,21 +3366,54 @@@
}
/**
-- * @param name Name to mask.
-- * @return Masked name.
++ *
*/
-- private static String maskNull(String name) {
-- return name == null ? NULL_NAME : name;
-- }
++ private static class LocalAffinityFunction implements AffinityFunction {
++ /** */
++ private static final long serialVersionUID = 0L;
-- /**
-- * @param name Name to unmask.
-- * @return Unmasked name.
-- */
-- @SuppressWarnings("StringEquality")
-- private static String unmaskNull(String name) {
-- // Intentional identity equality.
-- return name == NULL_NAME ? null : name;
++ /** {@inheritDoc} */
++ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
++ ClusterNode locNode = null;
++
++ for (ClusterNode n : affCtx.currentTopologySnapshot()) {
++ if (n.isLocal()) {
++ locNode = n;
++
++ break;
++ }
++ }
++
++ if (locNode == null)
++ throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache");
++
++ List<List<ClusterNode>> res = new ArrayList<>(partitions());
++
++ for (int part = 0; part < partitions(); part++)
++ res.add(Collections.singletonList(locNode));
++
++ return Collections.unmodifiableList(res);
++ }
++
++ /** {@inheritDoc} */
++ @Override public void reset() {
++ // No-op.
++ }
++
++ /** {@inheritDoc} */
++ @Override public int partitions() {
++ return 1;
++ }
++
++ /** {@inheritDoc} */
++ @Override public int partition(Object key) {
++ return 0;
++ }
++
++ /** {@inheritDoc} */
++ @Override public void removeNode(UUID nodeId) {
++ // No-op.
++ }
}
/**
@@@ -3478,55 -3478,95 +3512,4 @@@
return S.toString(TemplateConfigurationFuture.class, this);
}
}
--
-- /**
- *
- */
- @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- private class TemplateConfigurationFuture extends GridFutureAdapter<Object> {
- /** Start ID. */
- @GridToStringInclude
- private IgniteUuid deploymentId;
-
- /** Cache name. */
- private String cacheName;
-
- /**
- * @param cacheName Cache name.
- * @param deploymentId Deployment ID.
- */
- private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) {
- this.deploymentId = deploymentId;
- this.cacheName = cacheName;
- }
-
- /**
- * @return Start ID.
- */
- public IgniteUuid deploymentId() {
- return deploymentId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
- // Make sure to remove future before completion.
- pendingTemplateFuts.remove(maskNull(cacheName), this);
-
- return super.onDone(res, err);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TemplateConfigurationFuture.class, this);
- }
- } /**
-- *
-- */
-- private static class LocalAffinityFunction implements AffinityFunction {
-- /** */
-- private static final long serialVersionUID = 0L;
--
-- /** {@inheritDoc} */
-- @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
-- ClusterNode locNode = null;
--
-- for (ClusterNode n : affCtx.currentTopologySnapshot()) {
-- if (n.isLocal()) {
-- locNode = n;
--
-- break;
-- }
-- }
--
-- if (locNode == null)
-- throw new IgniteException("Local node is not included into affinity nodes for 'LOCAL' cache");
--
-- List<List<ClusterNode>> res = new ArrayList<>(partitions());
--
-- for (int part = 0; part < partitions(); part++)
-- res.add(Collections.singletonList(locNode));
--
-- return Collections.unmodifiableList(res);
-- }
--
-- /** {@inheritDoc} */
-- @Override public void reset() {
-- // No-op.
-- }
--
-- /** {@inheritDoc} */
-- @Override public int partitions() {
-- return 1;
-- }
--
-- /** {@inheritDoc} */
-- @Override public int partition(Object key) {
-- return 0;
-- }
--
-- /** {@inheritDoc} */
-- @Override public void removeNode(UUID nodeId) {
-- // No-op.
-- }
-- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f907d5b,698b035..c94cc46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -136,33 -136,33 +136,24 @@@ import static org.apache.ignite.interna
public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
/** */
public static int MAX_ITERATORS = 1000;
--
++ /** */
++ private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters =
++ new ConcurrentHashMap8<>();
++ /** */
++ private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes =
++ new ConcurrentHashMap8<>();
++ /** */
++ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** */
protected GridQueryProcessor qryProc;
--
/** */
private String space;
--
/** */
private int maxIterCnt;
--
/** */
private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter();
--
-- /** */
-- private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K, V>>>> qryIters =
-- new ConcurrentHashMap8<>();
--
-- /** */
-- private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes =
-- new ConcurrentHashMap8<>();
--
/** */
private volatile ConcurrentMap<Object, CachedResult<?>> qryResCache = new ConcurrentHashMap8<>();
--
-- /** */
-- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
--
/** Event listener. */
private GridLocalEventListener lsnr;
@@@ -172,6 -172,6 +163,17 @@@
/** */
private AffinityTopologyVersion qryTopVer;
++ /**
++ * @param sndId Sender node ID.
++ * @param reqId Request ID.
++ * @return Recipient ID.
++ */
++ private static Object recipient(UUID sndId, long reqId) {
++ assert sndId != null;
++
++ return new IgniteBiTuple<>(sndId, reqId);
++ }
++
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
qryProc = cctx.kernalContext().query();
@@@ -1715,17 -1715,17 +1717,6 @@@
}
/**
-- * @param sndId Sender node ID.
-- * @param reqId Request ID.
-- * @return Recipient ID.
-- */
-- private static Object recipient(UUID sndId, long reqId) {
-- assert sndId != null;
--
-- return new IgniteBiTuple<>(sndId, reqId);
-- }
--
-- /**
* @param qryInfo Info.
* @return Iterator.
* @throws IgniteCheckedException In case of error.
@@@ -2031,6 -2031,6 +2022,89 @@@
}
/**
++ * Query for {@link IndexingSpi}.
++ *
++ * @param keepPortable Keep portable flag.
++ * @return Query.
++ */
++ public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) {
++ return new GridCacheQueryAdapter<>(cctx,
++ SPI,
++ null,
++ null,
++ null,
++ null,
++ false,
++ keepPortable);
++ }
++
++ /**
++ * Creates user's predicate based scan query.
++ *
++ * @param filter Scan filter.
++ * @param part Partition.
++ * @param keepPortable Keep portable flag.
++ * @return Created query.
++ */
++ public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
++ @Nullable Integer part, boolean keepPortable) {
++
++ return new GridCacheQueryAdapter<>(cctx,
++ SCAN,
++ null,
++ null,
++ (IgniteBiPredicate<Object, Object>)filter,
++ part,
++ false,
++ keepPortable);
++ }
++
++ /**
++ * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
++ * documentation.
++ *
++ * @param clsName Query class name.
++ * @param search Search clause.
++ * @param keepPortable Keep portable flag.
++ * @return Created query.
++ */
++ public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
++ String search, boolean keepPortable) {
++ A.notNull("clsName", clsName);
++ A.notNull("search", search);
++
++ return new GridCacheQueryAdapter<>(cctx,
++ TEXT,
++ clsName,
++ search,
++ null,
++ null,
++ false,
++ keepPortable);
++ }
++
++ /**
++ * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
++ * documentation.
++ *
++ * @param qry Query.
++ * @param keepPortable Keep portable flag.
++ * @return Created query.
++ */
++ public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
++ A.notNull(qry, "qry");
++
++ return new GridCacheQueryAdapter<>(cctx,
++ SQL_FIELDS,
++ null,
++ qry,
++ null,
++ null,
++ false,
++ keepPortable);
++ }
++
++ /**
* Metadata job.
*/
@GridInternal
@@@ -2435,315 -2435,313 +2509,98 @@@
/**
*
*/
-- private abstract class AbstractLazySwapEntry {
++ private static class CompoundIterator<T> extends GridIteratorAdapter<T> {
/** */
-- private K key;
++ private static final long serialVersionUID = 4585888051556166304L;
/** */
-- private V val;
++ private final List<GridIterator<T>> iters;
-- /**
-- * @return Key bytes.
-- */
-- protected abstract byte[] keyBytes();
++ /** */
++ private int idx;
-- /**
-- * @return Value.
-- * @throws IgniteCheckedException If failed.
-- */
-- protected abstract V unmarshalValue() throws IgniteCheckedException;
++ /** */
++ private GridIterator<T> iter;
/**
-- * @return Key.
++ * @param iters Iterators.
*/
-- K key() {
-- try {
-- if (key != null)
-- return key;
++ private CompoundIterator(List<GridIterator<T>> iters) {
++ if (iters.isEmpty())
++ throw new IllegalArgumentException();
-- key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false);
++ this.iters = iters;
-- return key;
-- }
-- catch (IgniteCheckedException e) {
-- throw new IgniteException(e);
-- }
++ iter = F.first(iters);
}
-- /**
-- * @return Value.
-- */
-- V value() {
-- try {
-- if (val != null)
-- return val;
++ /** {@inheritDoc} */
++ @Override public boolean hasNextX() throws IgniteCheckedException {
++ if (iter.hasNextX())
++ return true;
-- val = unmarshalValue();
++ idx++;
-- return val;
-- }
-- catch (IgniteCheckedException e) {
-- throw new IgniteException(e);
++ while (idx < iters.size()) {
++ iter = iters.get(idx);
++
++ if (iter.hasNextX())
++ return true;
++
++ idx++;
}
++
++ return false;
}
-- /**
-- * @return TTL.
-- */
-- abstract long timeToLive();
++ /** {@inheritDoc} */
++ @Override public T nextX() throws IgniteCheckedException {
++ if (!hasNextX())
++ throw new NoSuchElementException();
-- /**
-- * @return Expire time.
-- */
-- abstract long expireTime();
++ return iter.nextX();
++ }
-- /**
-- * @return Version.
-- */
-- abstract GridCacheVersion version();
++ /** {@inheritDoc} */
++ @Override public void removeX() throws IgniteCheckedException {
++ throw new UnsupportedOperationException();
++ }
}
/**
-- *
++ * Cached result.
*/
-- private class LazySwapEntry extends AbstractLazySwapEntry {
++ private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
++ /** Absolute position of each recipient. */
++ private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
/** */
-- private final Map.Entry<byte[], byte[]> e;
++ private CircularQueue<R> queue;
++ /** */
++ private int pruned;
/**
-- * @param e Entry with
++ * @param rcpt ID of the recipient.
*/
-- LazySwapEntry(Map.Entry<byte[], byte[]> e) {
-- this.e = e;
-- }
++ protected CachedResult(Object rcpt) {
++ boolean res = addRecipient(rcpt);
-- /** {@inheritDoc} */
-- @Override protected byte[] keyBytes() {
-- return e.getKey();
++ assert res;
}
-- /** {@inheritDoc} */
-- @SuppressWarnings("IfMayBeConditional")
-- @Override protected V unmarshalValue() throws IgniteCheckedException {
-- IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
++ /**
++ * Close if this result does not have any other recipients.
++ *
++ * @param rcpt ID of the recipient.
++ * @throws IgniteCheckedException If failed.
++ */
++ public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
++ assert isDone();
-- CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
++ synchronized (recipients) {
++ if (recipients.isEmpty())
++ return;
-- return obj.value(cctx.cacheObjectContext(), false);
-- }
--
-- /** {@inheritDoc} */
-- @Override long timeToLive() {
-- return GridCacheSwapEntryImpl.timeToLive(e.getValue());
-- }
--
-- /** {@inheritDoc} */
-- @Override long expireTime() {
-- return GridCacheSwapEntryImpl.expireTime(e.getValue());
-- }
--
-- /** {@inheritDoc} */
-- @Override GridCacheVersion version() {
-- return GridCacheSwapEntryImpl.version(e.getValue());
-- }
-- }
--
-- /**
-- *
-- */
-- private class LazyOffheapEntry extends AbstractLazySwapEntry {
-- /** */
-- private final T2<Long, Integer> keyPtr;
--
-- /** */
-- private final T2<Long, Integer> valPtr;
--
-- /**
-- * @param keyPtr Key address.
-- * @param valPtr Value address.
-- */
-- private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) {
-- assert keyPtr != null;
-- assert valPtr != null;
--
-- this.keyPtr = keyPtr;
-- this.valPtr = valPtr;
-- }
--
-- /** {@inheritDoc} */
-- @Override protected byte[] keyBytes() {
-- return U.copyMemory(keyPtr.get1(), keyPtr.get2());
-- }
--
-- /** {@inheritDoc} */
-- @Override protected V unmarshalValue() throws IgniteCheckedException {
-- long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
--
-- CacheObject obj = cctx.fromOffheap(ptr, false);
--
-- V val = CU.value(obj, cctx, false);
--
-- assert val != null;
--
-- return val;
-- }
--
-- /** {@inheritDoc} */
-- @Override long timeToLive() {
-- return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1());
-- }
--
-- /** {@inheritDoc} */
-- @Override long expireTime() {
-- return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
-- }
--
-- /** {@inheritDoc} */
-- @Override GridCacheVersion version() {
-- return GridCacheOffheapSwapEntry.version(valPtr.get1());
-- }
-- }
--
-- /**
-- *
-- */
-- private class OffheapIteratorClosure
-- extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> {
-- /** */
-- private static final long serialVersionUID = 7410163202728985912L;
--
-- /** */
-- private IgniteBiPredicate<K, V> filter;
--
-- /** */
-- private boolean keepPortable;
--
-- /**
-- * @param filter Filter.
-- * @param keepPortable Keep portable flag.
-- */
-- private OffheapIteratorClosure(
-- @Nullable IgniteBiPredicate<K, V> filter,
-- boolean keepPortable) {
-- assert filter != null;
--
-- this.filter = filter;
-- this.keepPortable = keepPortable;
-- }
--
-- /** {@inheritDoc} */
-- @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr,
-- T2<Long, Integer> valPtr)
-- throws IgniteCheckedException {
-- LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
--
-- K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
-- V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable);
--
-- if (!filter.apply(key, val))
-- return null;
--
-- return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
-- }
-- }
--
-- /**
-- *
-- */
-- private static class CompoundIterator<T> extends GridIteratorAdapter<T> {
-- /** */
-- private static final long serialVersionUID = 4585888051556166304L;
--
-- /** */
-- private final List<GridIterator<T>> iters;
--
-- /** */
-- private int idx;
--
-- /** */
-- private GridIterator<T> iter;
--
-- /**
-- * @param iters Iterators.
-- */
-- private CompoundIterator(List<GridIterator<T>> iters) {
-- if (iters.isEmpty())
-- throw new IllegalArgumentException();
--
-- this.iters = iters;
--
-- iter = F.first(iters);
-- }
--
-- /** {@inheritDoc} */
-- @Override public boolean hasNextX() throws IgniteCheckedException {
-- if (iter.hasNextX())
-- return true;
--
-- idx++;
--
-- while (idx < iters.size()) {
-- iter = iters.get(idx);
--
-- if (iter.hasNextX())
-- return true;
--
-- idx++;
-- }
--
-- return false;
-- }
--
-- /** {@inheritDoc} */
-- @Override public T nextX() throws IgniteCheckedException {
-- if (!hasNextX())
-- throw new NoSuchElementException();
--
-- return iter.nextX();
-- }
--
-- /** {@inheritDoc} */
-- @Override public void removeX() throws IgniteCheckedException {
-- throw new UnsupportedOperationException();
-- }
-- }
--
-- /**
-- * Cached result.
-- */
-- private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
- /** Absolute position of each recipient. */
- private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
-- /** */
-- private CircularQueue<R> queue;
-
-- /** */
-- private int pruned;
-
- /** Absolute position of each recipient. */
- private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
--
-- /**
-- * @param rcpt ID of the recipient.
-- */
-- protected CachedResult(Object rcpt) {
-- boolean res = addRecipient(rcpt);
--
-- assert res;
-- }
--
-- /**
-- * Close if this result does not have any other recipients.
-- *
-- * @param rcpt ID of the recipient.
-- * @throws IgniteCheckedException If failed.
-- */
-- public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
-- assert isDone();
--
-- synchronized (recipients) {
-- if (recipients.isEmpty())
-- return;
--
-- recipients.remove(rcpt);
++ recipients.remove(rcpt);
if (recipients.isEmpty())
get().close();
@@@ -3022,85 -3020,126 +2879,217 @@@
}
/**
-- * Query for {@link IndexingSpi}.
*
-- * @param keepPortable Keep portable flag.
-- * @return Query.
*/
-- public <R> CacheQuery<R> createSpiQuery(boolean keepPortable) {
-- return new GridCacheQueryAdapter<>(cctx,
-- SPI,
-- null,
-- null,
-- null,
-- null,
-- false,
-- keepPortable);
++ private abstract class AbstractLazySwapEntry {
++ /** */
++ private K key;
++
++ /** */
++ private V val;
++
++ /**
++ * @return Key bytes.
++ */
++ protected abstract byte[] keyBytes();
++
++ /**
++ * @return Value.
++ * @throws IgniteCheckedException If failed.
++ */
++ protected abstract V unmarshalValue() throws IgniteCheckedException;
++
++ /**
++ * @return Key.
++ */
++ K key() {
++ try {
++ if (key != null)
++ return key;
++
++ key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false);
++
++ return key;
++ }
++ catch (IgniteCheckedException e) {
++ throw new IgniteException(e);
++ }
++ }
++
++ /**
++ * @return Value.
++ */
++ V value() {
++ try {
++ if (val != null)
++ return val;
++
++ val = unmarshalValue();
++
++ return val;
++ }
++ catch (IgniteCheckedException e) {
++ throw new IgniteException(e);
++ }
++ }
++
++ /**
++ * @return TTL.
++ */
++ abstract long timeToLive();
++
++ /**
++ * @return Expire time.
++ */
++ abstract long expireTime();
++
++ /**
++ * @return Version.
++ */
++ abstract GridCacheVersion version();
}
/**
-- * Creates user's predicate based scan query.
*
-- * @param filter Scan filter.
-- * @param part Partition.
-- * @param keepPortable Keep portable flag.
-- * @return Created query.
*/
-- public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
-- @Nullable Integer part, boolean keepPortable) {
++ private class LazySwapEntry extends AbstractLazySwapEntry {
++ /** */
++ private final Map.Entry<byte[], byte[]> e;
-- return new GridCacheQueryAdapter<>(cctx,
-- SCAN,
-- null,
-- null,
-- (IgniteBiPredicate<Object, Object>)filter,
-- part,
-- false,
-- keepPortable);
++ /**
++ * @param e Entry with
++ */
++ LazySwapEntry(Map.Entry<byte[], byte[]> e) {
++ this.e = e;
++ }
++
++ /** {@inheritDoc} */
++ @Override protected byte[] keyBytes() {
++ return e.getKey();
++ }
++
++ /** {@inheritDoc} */
++ @SuppressWarnings("IfMayBeConditional")
++ @Override protected V unmarshalValue() throws IgniteCheckedException {
++ IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
++
++ CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
++
++ return obj.value(cctx.cacheObjectContext(), false);
++ }
++
++ /** {@inheritDoc} */
++ @Override long timeToLive() {
++ return GridCacheSwapEntryImpl.timeToLive(e.getValue());
++ }
++
++ /** {@inheritDoc} */
++ @Override long expireTime() {
++ return GridCacheSwapEntryImpl.expireTime(e.getValue());
++ }
++
++ /** {@inheritDoc} */
++ @Override GridCacheVersion version() {
++ return GridCacheSwapEntryImpl.version(e.getValue());
++ }
}
/**
-- * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
-- * documentation.
*
-- * @param clsName Query class name.
-- * @param search Search clause.
-- * @param keepPortable Keep portable flag.
-- * @return Created query.
*/
-- public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
-- String search, boolean keepPortable) {
-- A.notNull("clsName", clsName);
-- A.notNull("search", search);
++ private class LazyOffheapEntry extends AbstractLazySwapEntry {
++ /** */
++ private final T2<Long, Integer> keyPtr;
-- return new GridCacheQueryAdapter<>(cctx,
-- TEXT,
-- clsName,
-- search,
-- null,
-- null,
-- false,
-- keepPortable);
++ /** */
++ private final T2<Long, Integer> valPtr;
++
++ /**
++ * @param keyPtr Key address.
++ * @param valPtr Value address.
++ */
++ private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> valPtr) {
++ assert keyPtr != null;
++ assert valPtr != null;
++
++ this.keyPtr = keyPtr;
++ this.valPtr = valPtr;
++ }
++
++ /** {@inheritDoc} */
++ @Override protected byte[] keyBytes() {
++ return U.copyMemory(keyPtr.get1(), keyPtr.get2());
++ }
++
++ /** {@inheritDoc} */
++ @Override protected V unmarshalValue() throws IgniteCheckedException {
++ long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
++
++ CacheObject obj = cctx.fromOffheap(ptr, false);
++
++ V val = CU.value(obj, cctx, false);
++
++ assert val != null;
++
++ return val;
++ }
++
++ /** {@inheritDoc} */
++ @Override long timeToLive() {
++ return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1());
++ }
++
++ /** {@inheritDoc} */
++ @Override long expireTime() {
++ return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
++ }
++
++ /** {@inheritDoc} */
++ @Override GridCacheVersion version() {
++ return GridCacheOffheapSwapEntry.version(valPtr.get1());
++ }
}
/**
- * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
- * documentation.
*
- * @param qry Query.
- * @param keepPortable Keep portable flag.
- * @return Created query.
*/
- public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
- A.notNull(qry, "qry");
+ private class OffheapIteratorClosure
+ extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> {
+ /** */
+ private static final long serialVersionUID = 7410163202728985912L;
- return new GridCacheQueryAdapter<>(cctx,
- SQL_FIELDS,
- null,
- qry,
- null,
- null,
- false,
- keepPortable);
+ /** */
+ private IgniteBiPredicate<K, V> filter;
+
+ /** */
+ private boolean keepPortable;
+
+ /**
+ * @param filter Filter.
+ * @param keepPortable Keep portable flag.
+ */
+ private OffheapIteratorClosure(
+ @Nullable IgniteBiPredicate<K, V> filter,
+ boolean keepPortable) {
+ assert filter != null;
+
+ this.filter = filter;
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr,
+ T2<Long, Integer> valPtr)
+ throws IgniteCheckedException {
+ LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
+
+ K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
+ V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable);
+
+ if (!filter.apply(key, val))
+ return null;
+
+ return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+ }
- } /**
- * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
- * documentation.
- *
- * @param qry Query.
- * @param keepPortable Keep portable flag.
- * @return Created query.
- */
- public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
- A.notNull(qry, "qry");
-
- return new GridCacheQueryAdapter<>(cctx,
- SQL_FIELDS,
- null,
- qry,
- null,
- null,
- false,
- keepPortable);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae09fa9f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 9117863,df79232..afb599d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@@ -92,13 -91,13 +92,10 @@@ public class GridRestProcessor extends
/** Default session timout. */
private static final int DEFAULT_SES_TIMEOUT = 30_000;
--
-- /** Protocols. */
-- private final Collection<GridRestProtocol> protos = new ArrayList<>();
--
/** Command handlers. */
protected final Map<GridRestCommand, GridRestCommandHandler> handlers = new EnumMap<>(GridRestCommand.class);
--
++ /** Protocols. */
++ private final Collection<GridRestProtocol> protos = new ArrayList<>();
/** */
private final CountDownLatch startLatch = new CountDownLatch(1);
@@@ -132,6 -131,6 +129,85 @@@
private final long sesTtl;
/**
++ * @param ctx Context.
++ */
++ public GridRestProcessor(GridKernalContext ctx) {
++ super(ctx);
++
++ long sesExpTime0;
++ String sesExpTime = null;
++
++ try {
++ sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT);
++
++ if (sesExpTime != null)
++ sesExpTime0 = Long.valueOf(sesExpTime) * 1000;
++ else
++ sesExpTime0 = DEFAULT_SES_TIMEOUT;
++ }
++ catch (NumberFormatException ignore) {
++ U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT="
++ + sesExpTime + "]");
++
++ sesExpTime0 = DEFAULT_SES_TIMEOUT;
++ }
++
++ sesTtl = sesExpTime0;
++
++ sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker",
++ new GridWorker(ctx.gridName(), "session-timeout-worker", log) {
++ @Override protected void body() throws InterruptedException {
++ while (!isCancelled()) {
++ Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
++
++ for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) {
++ Session ses = e.getValue();
++
++ if (ses.isTimedOut(sesTtl)) {
++ sesId2Ses.remove(ses.sesId, ses);
++
++ clientId2SesId.remove(ses.clientId, ses.sesId);
++ }
++ }
++ }
++ }
++ });
++ }
++
++/**
++ * Applies interceptor to a response object.
++ * Specially handler {@link Map} and {@link Collection} responses.
++ *
++ * @param obj Response object.
++ * @param interceptor Interceptor to apply.
++ * @return Intercepted object.
++ */
++ private static Object interceptSendObject(Object obj, ConnectorMessageInterceptor interceptor) {
++ if (obj instanceof Map) {
++ Map<Object, Object> original = (Map<Object, Object>)obj;
++
++ Map<Object, Object> m = new HashMap<>();
++
++ for (Map.Entry e : original.entrySet())
++ m.put(interceptor.onSend(e.getKey()), interceptor.onSend(e.getValue()));
++
++ return m;
++ }
++ else if (obj instanceof Collection) {
++ Collection<Object> original = (Collection<Object>)obj;
++
++ Collection<Object> c = new ArrayList<>(original.size());
++
++ for (Object e : original)
++ c.add(interceptor.onSend(e));
++
++ return c;
++ }
++ else
++ return interceptor.onSend(obj);
++ }
++
++ /**
* @param req Request.
* @return Future.
*/
@@@ -386,52 -385,52 +462,6 @@@
}
}
-- /**
-- * @param ctx Context.
-- */
-- public GridRestProcessor(GridKernalContext ctx) {
-- super(ctx);
--
-- long sesExpTime0;
-- String sesExpTime = null;
--
-- try {
-- sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT);
--
-- if (sesExpTime != null)
-- sesExpTime0 = Long.valueOf(sesExpTime) * 1000;
-- else
-- sesExpTime0 = DEFAULT_SES_TIMEOUT;
-- }
-- catch (NumberFormatException ignore) {
-- U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT="
-- + sesExpTime + "]");
--
-- sesExpTime0 = DEFAULT_SES_TIMEOUT;
-- }
--
-- sesTtl = sesExpTime0;
--
-- sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker",
-- new GridWorker(ctx.gridName(), "session-timeout-worker", log) {
-- @Override protected void body() throws InterruptedException {
-- while (!isCancelled()) {
-- Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
--
-- for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) {
-- Session ses = e.getValue();
--
-- if (ses.isTimedOut(sesTtl)) {
-- sesId2Ses.remove(ses.sesId, ses);
--
-- clientId2SesId.remove(ses.clientId, ses.sesId);
-- }
-- }
-- }
-- }
-- });
-- }
--
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (isRestEnabled()) {
@@@ -516,7 -517,54 +546,7 @@@
}
}
- /**
- * Applies {@link ConnectorMessageInterceptor}
- * from {@link ConnectorConfiguration#getMessageInterceptor()} ()}
- * to all user parameters in the request.
- *
- * @param req Client request.
- */
- private void interceptRequest(GridRestRequest req) {
- ConnectorMessageInterceptor interceptor = config().getMessageInterceptor();
-
- if (interceptor == null)
- return;
-
- if (req instanceof GridRestCacheRequest) {
- GridRestCacheRequest req0 = (GridRestCacheRequest) req;
-
- req0.key(interceptor.onReceive(req0.key()));
- req0.value(interceptor.onReceive(req0.value()));
- req0.value2(interceptor.onReceive(req0.value2()));
-
- Map<Object, Object> oldVals = req0.values();
-
- if (oldVals != null) {
- Map<Object, Object> newVals = U.newHashMap(oldVals.size());
-
- for (Map.Entry<Object, Object> e : oldVals.entrySet())
- newVals.put(interceptor.onReceive(e.getKey()), interceptor.onReceive(e.getValue()));
-
- req0.values(U.sealMap(newVals));
- }
- }
- else if (req instanceof GridRestTaskRequest) {
- GridRestTaskRequest req0 = (GridRestTaskRequest) req;
-
- List<Object> oldParams = req0.params();
-
- if (oldParams != null) {
- Collection<Object> newParams = new ArrayList<>(oldParams.size());
-
- for (Object o : oldParams)
- newParams.add(interceptor.onReceive(o));
-
- req0.params(U.sealList(newParams));
- }
- }
- }
-
-- /**
++ /**
* Applies {@link ConnectorMessageInterceptor}
* from {@link ConnectorConfiguration#getMessageInterceptor()} ()}
* to all user parameters in the request.
@@@ -562,8 -610,8 +592,7 @@@
}
}
}
--
-- /**
++/**
* Applies {@link ConnectorMessageInterceptor} from
* {@link ConnectorConfiguration#getMessageInterceptor()}
* to all user objects in the response.