You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:47 UTC
[26/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index d54e06f..5cbe377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -121,11 +121,7 @@ public class IgnitionEx {
};
/** */
- private static ThreadLocal<Boolean> clientMode = new ThreadLocal<Boolean>() {
- @Override protected Boolean initialValue() {
- return null;
- }
- };
+ private static ThreadLocal<Boolean> clientMode = new ThreadLocal<>();
/**
* Checks runtime version to be 1.7.x or 1.8.x.
@@ -196,7 +192,7 @@ public class IgnitionEx {
* @return Client mode flag.
*/
public static boolean isClientMode() {
- return clientMode.get();
+ return clientMode.get() == null ? false : clientMode.get();
}
/**
@@ -1458,8 +1454,9 @@ public class IgnitionEx {
DFLT_PUBLIC_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+ if (!myCfg.isClientMode())
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
@@ -1471,7 +1468,7 @@ public class IgnitionEx {
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
// Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+ ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
@@ -1764,20 +1761,14 @@ public class IgnitionEx {
public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
List<CacheConfiguration> cacheCfgs = new ArrayList<>();
- boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi;
-
- // Add marshaller and utility caches.
- if (!clientDisco) {
- cacheCfgs.add(marshallerSystemCache());
+ cacheCfgs.add(marshallerSystemCache());
- cacheCfgs.add(utilitySystemCache());
- }
+ cacheCfgs.add(utilitySystemCache());
if (IgniteComponentType.HADOOP.inClassPath())
cacheCfgs.add(CU.hadoopSystemCache());
- if (cfg.getAtomicConfiguration() != null && !clientDisco)
- cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
+ cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
CacheConfiguration[] userCaches = cfg.getCacheConfiguration();
@@ -1854,7 +1845,7 @@ public class IgnitionEx {
if (cfg.getSwapSpaceSpi() == null) {
boolean needSwap = false;
- if (cfg.getCacheConfiguration() != null) {
+ if (cfg.getCacheConfiguration() != null && !Boolean.TRUE.equals(cfg.isClientMode())) {
for (CacheConfiguration c : cfg.getCacheConfiguration()) {
if (c.isSwapEnabled()) {
needSwap = true;
@@ -2005,7 +1996,6 @@ public class IgnitionEx {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
- ccfg.setNearConfiguration(new NearCacheConfiguration());
if (cfg.getCacheMode() == PARTITIONED)
ccfg.setBackups(cfg.getBackups());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
index 5dca2f2..21f2264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.marshaller.*;
import org.jsr166.*;
@@ -49,10 +50,29 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE);
- while (urls.hasMoreElements())
+ boolean foundClsNames = false;
+
+ while (urls.hasMoreElements()) {
processResource(urls.nextElement());
- processResource(ldr.getResource(JDK_CLS_NAMES_FILE));
+ foundClsNames = true;
+ }
+
+ if (!foundClsNames)
+ throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+ "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+ URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE);
+
+ if (jdkClsNames == null)
+ throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+ "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+ processResource(jdkClsNames);
+
+ checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
+ checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE);
+ checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
}
catch (IOException e) {
throw new IllegalStateException("Failed to initialize marshaller context.", e);
@@ -60,6 +80,18 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
}
/**
+ * @param clsName Class name.
+ * @param ldr Class loader used to get properties file.
+ * @param fileName File name.
+ */
+ private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
+ if (!map.containsKey(clsName.hashCode()))
+ throw new IgniteException("Failed to read class name from class names properties file. " +
+ "Make sure class names properties file packaged with ignite binaries is not corrupted " +
+ "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']');
+ }
+
+ /**
* @param url Resource URL.
* @throws IOException In case of error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 85939a6..e614408 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -59,7 +59,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
new ContinuousQueryListener(log, workDir),
null,
- true,
+ ctx.cache().marshallerCache().context().affinityNode(),
true
);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
index ee32692..779b54d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
@@ -22,19 +22,17 @@ import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import java.io.*;
-
/**
* Custom event.
*/
public class DiscoveryCustomEvent extends DiscoveryEvent {
/** */
private static final long serialVersionUID = 0L;
-
+
/**
* Built-in event type: custom event sent.
* <br>
- * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}.
+ * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}.
* <p>
*
* @see DiscoveryCustomEvent
@@ -42,7 +40,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
public static final int EVT_DISCOVERY_CUSTOM_EVT = 18;
/** */
- private Serializable data;
+ private DiscoveryCustomMessage customMsg;
/** Affinity topology version. */
private AffinityTopologyVersion affTopVer;
@@ -57,15 +55,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
/**
* @return Data.
*/
- public Serializable data() {
- return data;
+ public DiscoveryCustomMessage customMessage() {
+ return customMsg;
}
/**
- * @param data New data.
+ * @param customMsg New customMessage.
*/
- public void data(Serializable data) {
- this.data = data;
+ public void customMessage(DiscoveryCustomMessage customMsg) {
+ this.customMsg = customMsg;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
}
/**
+ * Serializes the message and sends it into the given output stream.
* @param msg Message.
* @param hdr Message header.
* @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ U.writeString(out, req.userName());
writePath(out, req.path());
writePath(out, req.destinationPath());
out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
case OPEN_CREATE: {
IgfsPathControlRequest req = new IgfsPathControlRequest();
+ req.userName(U.readString(in));
req.path(readPath(in));
req.destinationPath(readPath(in));
req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
}
}
- assert msg != null;
-
msg.command(cmd);
return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
return null;
}
-
- /**
- * Writes string to output.
- *
- * @param out Data output.
- * @param str String.
- * @throws IOException If write failed.
- */
- private void writeString(DataOutput out, @Nullable String str) throws IOException {
- out.writeBoolean(str != null);
-
- if (str != null)
- out.writeUTF(str);
- }
-
- /**
- * Reads string from input.
- *
- * @param in Data input.
- * @return Read string.
- * @throws IOException If read failed.
- */
- @Nullable private String readString(DataInput in) throws IOException {
- boolean hasStr = in.readBoolean();
-
- if (hasStr)
- return in.readUTF();
-
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.igfs.common;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
/** Last modification time. */
private long modificationTime;
+ /** The user name this control request is made on behalf of. */
+ private String userName;
+
/**
* @param path Path.
*/
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
@Override public String toString() {
return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
}
+
+ /**
+ * Getter for the user name.
+ * @return user name.
+ */
+ public final String userName() {
+ assert userName != null;
+
+ return userName;
+ }
+
+ /**
+ * Setter for the user name.
+ * @param userName the user name.
+ */
+ public final void userName(String userName) {
+ this.userName = IgfsUtils.fixUserName(userName);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index c93c059..bea4256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,7 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -31,7 +31,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
@@ -439,46 +439,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.cache().cache(cacheName).containsKey(key);
}
- @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
- @Nullable ClassLoader ldr) {
- assert ctx.swap().enabled();
-
- try {
- ctx.swap().write(spaceName, key, val, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- @SuppressWarnings({"unchecked"})
- @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key,
- @Nullable ClassLoader ldr) {
- try {
- assert ctx.swap().enabled();
-
- return ctx.swap().readValue(spaceName, key, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public int partition(String cacheName, Object key) {
return ctx.cache().cache(cacheName).affinity().partition(key);
}
- @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
- try {
- assert ctx.swap().enabled();
-
- ctx.swap().remove(spaceName, key, null, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
for (GridComponent comp : ctx) {
IgniteNodeValidationResult err = comp.validateNode(node);
@@ -508,26 +472,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
}
- @SuppressWarnings("unchecked")
- @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName,
- Object key, @Nullable ClassLoader ldr) {
- try {
- IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName);
-
- GridCacheContext cctx = cache.context();
-
- if (cctx.isNear())
- cctx = cctx.near().dht().context();
-
- GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
-
- return e != null ? CU.<V>value(e.value(), cctx, true) : null;
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public MessageFormatter messageFormatter() {
return ctx.io().formatter();
}
@@ -540,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.discovery().tryFailNode(nodeId);
}
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
+
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+ }
+
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 2e80b6f..ce2a36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
private final GridMessageListener lsnr = new CheckpointRequestListener();
/** */
- private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap;
/** */
- private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>(
- MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q);
+ private final Collection<IgniteUuid> closedSess;
/** Grid marshaller. */
private final Marshaller marsh;
@@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
super(ctx, ctx.config().getCheckpointSpi());
marsh = ctx.config().getMarshaller();
+
+ if (enabled()) {
+ keyMap = new ConcurrentHashMap8<>();
+
+ closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS,
+ MAX_CLOSED_SESS,
+ 0.75f,
+ 256,
+ PER_SEGMENT_Q);
+ }
+ else {
+ keyMap = null;
+
+ closedSess = null;
+ }
}
/** {@inheritDoc} */
@@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Session IDs.
*/
public Collection<IgniteUuid> sessionIds() {
- return new ArrayList<>(keyMap.keySet());
+ return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList();
}
/**
@@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return {@code true} if checkpoint has been actually saved, {@code false} otherwise.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope,
- long timeout, boolean override) throws IgniteCheckedException {
+ public boolean storeCheckpoint(GridTaskSessionInternal ses,
+ String key,
+ Object state,
+ ComputeTaskSessionScope scope,
+ long timeout,
+ boolean override)
+ throws IgniteCheckedException
+ {
+ if (!enabled())
+ return false;
+
assert ses != null;
assert key != null;
@@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Whether or not checkpoint was removed.
*/
public boolean removeCheckpoint(String key) {
+ if (!enabled())
+ return false;
+
assert key != null;
boolean rmv = false;
@@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @return Whether or not checkpoint was removed.
*/
public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) {
+ if (!enabled())
+ return false;
+
assert ses != null;
assert key != null;
@@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException {
+ if (!enabled())
+ return null;
+
assert ses != null;
assert key != null;
@@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @param cleanup Whether cleanup or not.
*/
public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) {
+ if (!enabled())
+ return;
+
closedSess.add(ses.getId());
// If on task node.
@@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']');
- X.println(">>> keyMap: " + keyMap.size());
+ X.println(">>> keyMap: " + (keyMap != null ? keyMap.size() : 0));
}
/**
@@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
if (log.isDebugEnabled())
log.debug("Received checkpoint request: " + req);
+ if (!enabled())
+ return;
+
IgniteUuid sesId = req.getSessionId();
if (closedSess.contains(sesId)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c877d57..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1211,6 +1211,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
if (p != null) {
try {
+ if (p instanceof GridLifecycleAwareMessageFilter)
+ ((GridLifecycleAwareMessageFilter)p).initialize(ctx);
+ else
+ ctx.resource().injectGeneric(p);
+
addMessageListener(TOPIC_COMM_USER,
new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
}
@@ -1695,13 +1700,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
throws IgniteCheckedException {
this.topic = topic;
this.predLsnr = predLsnr;
-
- if (predLsnr != null) {
- if (predLsnr instanceof GridLifecycleAwareMessageFilter)
- ((GridLifecycleAwareMessageFilter)predLsnr).initialize(ctx);
- else
- ctx.resource().injectGeneric(predLsnr);
- }
}
/** {@inheritDoc} */
@@ -1724,69 +1722,84 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- Object msgBody = ioMsg.body();
-
- assert msgBody != null || ioMsg.bodyBytes() != null;
+ busyLock.readLock();
try {
- byte[] msgTopicBytes = ioMsg.topicBytes();
-
- Object msgTopic = ioMsg.topic();
-
- GridDeployment dep = ioMsg.deployment();
-
- if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
- ioMsg.deploymentClassName() != null) {
- dep = ctx.deploy().getGlobalDeployment(
- ioMsg.deploymentMode(),
- ioMsg.deploymentClassName(),
- ioMsg.deploymentClassName(),
- ioMsg.userVersion(),
- nodeId,
- ioMsg.classLoaderId(),
- ioMsg.loaderParticipants(),
- null);
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException(
- "Failed to obtain deployment information for user message. " +
- "If you are using custom message or topic class, try implementing " +
- "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
- ioMsg.deployment(dep); // Cache deployment.
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received user message while stopping (will ignore) [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ return;
}
- // Unmarshall message topic if needed.
- if (msgTopic == null && msgTopicBytes != null) {
- msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+ Object msgBody = ioMsg.body();
- ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
- }
+ assert msgBody != null || ioMsg.bodyBytes() != null;
- if (!F.eq(topic, msgTopic))
- return;
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException(
+ "Failed to obtain deployment information for user message. " +
+ "If you are using custom message or topic class, try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
- if (msgBody == null) {
- msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
- ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
- }
+ ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+ }
- // Resource injection.
- if (dep != null)
- ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
- msg + ']', e);
- }
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
- if (msgBody != null) {
- if (predLsnr != null) {
- if (!predLsnr.apply(nodeId, msgBody))
- removeMessageListener(TOPIC_COMM_USER, this);
+ ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+ }
+
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
}
}
+ finally {
+ busyLock.readUnlock();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
new file mode 100644
index 0000000..2005d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.cluster.*;
+
+/**
+ * Listener interface.
+ */
+public interface CustomEventListener<T extends DiscoveryCustomMessage> {
+ /**
+ * @param snd Sender.
+ * @param msg Message.
+ */
+ public void onCustomEvent(ClusterNode snd, T msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
new file mode 100644
index 0000000..23f8bda
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
+
+/**
+ *
+ */
+class CustomMessageWrapper implements DiscoverySpiCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final DiscoveryCustomMessage delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ CustomMessageWrapper(DiscoveryCustomMessage delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ DiscoveryCustomMessage res = delegate.ackMessage();
+
+ return res == null ? null : new CustomMessageWrapper(res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return delegate.isMutable();
+ }
+
+ /**
+ * @return Delegate.
+ */
+ public DiscoveryCustomMessage delegate() {
+ return delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return delegate.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
new file mode 100644
index 0000000..401486d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public interface DiscoveryCustomMessage extends Serializable {
+ /**
+ * @return Unique custom message ID.
+ */
+ public IgniteUuid id();
+
+ /**
+ * Whether or not minor version of topology should be increased on message receive.
+ *
+ * @return {@code true} if minor topology version should be increased.
+ * @see AffinityTopologyVersion#minorTopVer
+ */
+ public boolean incrementMinorTopologyVersion();
+
+ /**
+ * Called when custom message has been handled by all nodes.
+ *
+ * @return Ack message or {@code null} if ack is not required.
+ */
+ @Nullable public DiscoveryCustomMessage ackMessage();
+
+ /**
+ * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ */
+ public boolean isMutable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 0950774..71fbc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.jobmetrics.*;
import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
@@ -165,10 +166,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final GridLocalMetrics metrics = createMetrics();
/** Metrics update worker. */
- private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+ private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
/** Custom event listener. */
- private GridPlainInClosure<Serializable> customEvtLsnr;
+ private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
+ new ConcurrentHashMap8<>();
/** Map of dynamic cache filters. */
private Map<String, CachePredicate> registeredCaches = new HashMap<>();
@@ -176,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** Received custom messages history. */
+ private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
+
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
@@ -214,6 +219,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param filter Cache filter.
+ * @param nearEnabled Near enabled flag.
* @param loc {@code True} if cache is local.
*/
public void setCacheFilter(
@@ -240,12 +246,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param clientNodeId Near node ID.
+ * @param nearEnabled Near enabled flag.
*/
public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- if (predicate != null)
- predicate.addClientNode(clientNodeId, nearEnabled);
+ if (pred != null)
+ pred.addClientNode(clientNodeId, nearEnabled);
}
/**
@@ -279,17 +286,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- /**
- * @param evtType Event type.
- * @return Next affinity topology version.
- */
- private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) {
- if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)
- minorTopVer++;
- else if (evtType != EVT_NODE_METRICS_UPDATED)
- minorTopVer = 0;
-
- return new AffinityTopologyVersion(topVer, minorTopVer);
+ /** {@inheritDoc} */
+ @Override protected void onKernalStart0() throws IgniteCheckedException {
+ if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
+ ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
+ "(set TcpDiscoverySpi.forceServerMode to false)");
}
/** {@inheritDoc} */
@@ -328,7 +329,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
checkSegmentOnStart();
}
- new IgniteThread(metricsUpdater).start();
+ metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
spi.setMetricsProvider(createMetricsProvider());
@@ -356,14 +357,41 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ClusterNode node,
Collection<ClusterNode> topSnapshot,
Map<Long, Collection<ClusterNode>> snapshots,
- @Nullable Serializable data
+ @Nullable DiscoverySpiCustomMessage spiCustomMsg
) {
+ DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
+ : ((CustomMessageWrapper)spiCustomMsg).delegate();
+
+ if (skipMessage(type, customMsg))
+ return;
+
final ClusterNode locNode = localNode();
if (snapshots != null)
topHist = snapshots;
- AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer);
+ boolean verChanged;
+
+ if (type == EVT_NODE_METRICS_UPDATED)
+ verChanged = false;
+ else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ assert customMsg != null;
+
+ if (customMsg.incrementMinorTopologyVersion()) {
+ minorTopVer++;
+
+ verChanged = true;
+ }
+ else
+ verChanged = false;
+ }
+ else {
+ minorTopVer = 0;
+
+ verChanged = true;
+ }
+
+ AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) {
for (DiscoCache c : discoCacheHist.values())
@@ -373,19 +401,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
- try {
- if (customEvtLsnr != null)
- customEvtLsnr.apply(data);
- }
- catch (Exception e) {
- U.error(log, "Failed to notify direct custom event listener: " + data, e);
+ for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
+ List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
+
+ if (list != null) {
+ for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+ try {
+ lsnr.onCustomEvent(node, customMsg);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
+ }
+ }
+ }
}
}
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
- if (type != EVT_NODE_METRICS_UPDATED) {
+ if (verChanged) {
DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
discoCacheHist.put(nextTopVer, cache);
@@ -417,7 +452,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return;
}
- discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data);
+ discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
}
});
@@ -486,10 +521,43 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * @param customEvtLsnr Custom event listener.
+ * @param type Message type.
+ * @param customMsg Custom message.
+ * @return {@code True} if should not process message.
*/
- public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) {
- this.customEvtLsnr = customEvtLsnr;
+ private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
+ if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ assert customMsg != null && customMsg.id() != null : customMsg;
+
+ if (rcvdCustomMsgs.contains(customMsg.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]");
+
+ return true;
+ }
+
+ rcvdCustomMsgs.addLast(customMsg.id());
+
+ while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE)
+ rcvdCustomMsgs.pollFirst();
+ }
+
+ return false;
+ }
+
+ /**
+ * @param msgCls Message class.
+ * @param lsnr Custom event listener.
+ */
+ public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {
+ List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls);
+
+ if (list == null) {
+ list = F.addIfAbsent(customEvtLsnrs, msgCls,
+ new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>());
+ }
+
+ list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr);
}
/**
@@ -660,7 +728,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Map<Integer, CacheMetrics> metrics = null;
for (GridCacheAdapter<?, ?> cache : caches) {
- if (cache.context().started() && cache.configuration().isStatisticsEnabled()) {
+ if (cache.configuration().isStatisticsEnabled() &&
+ cache.context().started() &&
+ cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0) {
if (metrics == null)
metrics = U.newHashMap(caches.size());
@@ -952,11 +1022,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
getSpi().setListener(null);
// Stop discovery worker and metrics updater.
+ U.closeQuiet(metricsUpdateTask);
+
U.cancel(discoWrk);
- U.cancel(metricsUpdater);
U.join(discoWrk, log);
- U.join(metricsUpdater, log);
// Stop SPI itself.
stopSpi();
@@ -1218,13 +1288,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets alive remote nodes with at least one cache configured.
+ * Gets alive remote server nodes with at least one cache configured.
*
* @param topVer Topology version (maximum allowed node order).
* @return Collection of alive cache nodes.
*/
- public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion());
+ public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
+ }
+
+ /**
+ * Gets alive server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version (maximum allowed node order).
+ * @return Collection of alive cache nodes.
+ */
+ public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
}
/**
@@ -1256,9 +1336,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node is a cache data node.
*/
public boolean cacheAffinityNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.dataNode(node);
+ return pred != null && pred.dataNode(node);
}
/**
@@ -1267,9 +1347,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node has near cache enabled.
*/
public boolean cacheNearNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.nearNode(node);
+ return pred != null && pred.nearNode(node);
}
/**
@@ -1278,9 +1358,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node has client cache (without near cache).
*/
public boolean cacheClientNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.clientNode(node);
+ return pred != null && pred.clientNode(node);
}
/**
@@ -1289,9 +1369,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return If cache with the given name is accessible on the given node.
*/
public boolean cacheNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.cacheNode(node);
+ return pred != null && pred.cacheNode(node);
}
/**
@@ -1384,10 +1464,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * @param evt Event.
+ * @param msg Custom message.
*/
- public void sendCustomEvent(Serializable evt) {
- getSpi().sendCustomEvent(evt);
+ public void sendCustomEvent(DiscoveryCustomMessage msg) {
+ getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
}
/**
@@ -1542,8 +1622,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Worker for discovery events. */
private class DiscoveryWorker extends GridWorker {
/** Event queue. */
- private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts =
- new LinkedBlockingQueue<>();
+ private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+ DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
/** Node segmented event fired flag. */
private boolean nodeSegFired;
@@ -1609,9 +1689,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
AffinityTopologyVersion topVer,
ClusterNode node,
Collection<ClusterNode> topSnapshot,
- @Nullable Serializable data
+ @Nullable DiscoveryCustomMessage data
) {
- assert node != null;
+ assert node != null : data;
evts.add(F.t(type, topVer, node, topSnapshot, data));
}
@@ -1650,7 +1730,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** @throws InterruptedException If interrupted. */
@SuppressWarnings("DuplicateCondition")
private void body0() throws InterruptedException {
- GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take();
+ GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+ DiscoveryCustomMessage> evt = evts.take();
int type = evt.get1();
@@ -1768,7 +1849,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
customEvt.type(type);
customEvt.topologySnapshot(topVer.topologyVersion(), null);
customEvt.affinityTopologyVersion(topVer);
- customEvt.data(evt.get5());
+ customEvt.customMessage(evt.get5());
ctx.event().record(customEvt);
}
@@ -1833,28 +1914,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
*
*/
- private class MetricsUpdater extends GridWorker {
+ private class MetricsUpdater implements Runnable {
/** */
private long prevGcTime = -1;
/** */
private long prevCpuTime = -1;
- /**
- *
- */
- private MetricsUpdater() {
- super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
- }
-
/** {@inheritDoc} */
- @Override protected void body() throws IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- U.sleep(METRICS_UPDATE_FREQ);
-
- gcCpuLoad = getGcCpuLoad();
- cpuLoad = getCpuLoad();
- }
+ @Override public void run() {
+ gcCpuLoad = getGcCpuLoad();
+ cpuLoad = getCpuLoad();
}
/**
@@ -2065,9 +2135,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final Collection<ClusterNode> aliveNodesWithCaches;
/**
- * Cached alive remote nodes with caches.
+ * Cached alive server remote nodes with caches.
+ */
+ private final Collection<ClusterNode> aliveSrvNodesWithCaches;
+
+ /**
+ * Cached alive remote server nodes with caches.
*/
- private final Collection<ClusterNode> aliveRmtNodesWithCaches;
+ private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
/**
* @param loc Local node.
@@ -2088,21 +2163,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
all.addAll(rmtNodes);
+ Collections.sort(all, GridNodeOrderComparator.INSTANCE);
+
allNodes = Collections.unmodifiableList(all);
- Map<String, Collection<ClusterNode>> cacheMap =
- new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> rmtCacheMap =
- new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> dhtNodesMap =
- new HashMap<>(allNodes.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> dhtNodesMap =new HashMap<>(allNodes.size(), 1.0f);
Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
@@ -2154,8 +2229,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (alive(node.id())) {
aliveNodesWithCaches.add(node);
- if (!loc.id().equals(node.id()))
- aliveRmtNodesWithCaches.add(node);
+ if (!CU.clientNode(node)) {
+ aliveSrvNodesWithCaches.add(node);
+
+ if (!loc.id().equals(node.id()))
+ aliveRmtSrvNodesWithCaches.add(node);
+ }
}
}
@@ -2240,13 +2319,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * @return All nodes with at least one cache configured.
- */
- Collection<ClusterNode> allNodesWithCaches() {
- return allNodesWithCaches;
- }
-
- /**
* Gets collection of nodes which have version equal or greater than {@code ver}.
*
* @param ver Version to check.
@@ -2345,13 +2417,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all alive remote nodes with at least one cache configured.
+ * Gets all alive remote server nodes with at least one cache configured.
*
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) {
- return filter(topVer, aliveRmtNodesWithCaches);
+ Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveRmtSrvNodesWithCaches);
+ }
+
+ /**
+ * Gets all alive server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveSrvNodesWithCaches);
}
/**
@@ -2388,7 +2470,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
filterNodeMap(aliveRmtCacheNodes, leftNode);
aliveNodesWithCaches.remove(leftNode);
- aliveRmtNodesWithCaches.remove(leftNode);
+ aliveSrvNodesWithCaches.remove(leftNode);
+ aliveRmtSrvNodesWithCaches.remove(leftNode);
}
/**
@@ -2480,11 +2563,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private boolean loc;
/** Collection of client near nodes. */
- private Map<UUID, Boolean> clientNodes;
+ private ConcurrentHashMap<UUID, Boolean> clientNodes;
/**
* @param cacheFilter Cache filter.
* @param nearEnabled Near enabled flag.
+ * @param loc {@code True} if cache is local.
*/
private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) {
assert cacheFilter != null;
@@ -2498,9 +2582,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId Near node ID to add.
+ * @param nearEnabled Near enabled flag.
*/
public void addClientNode(UUID nodeId, boolean nearEnabled) {
- clientNodes.put(nodeId, nearEnabled);
+ clientNodes.putIfAbsent(nodeId, nearEnabled);
}
/**
@@ -2515,7 +2600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if this node is a data node for given cache.
*/
public boolean dataNode(ClusterNode node) {
- return !node.isDaemon() && cacheFilter.apply(node);
+ return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
}
/**
@@ -2523,8 +2608,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if cache is accessible on the given node.
*/
public boolean cacheNode(ClusterNode node) {
- return !node.isClient() && !node.isDaemon() &&
- (cacheFilter.apply(node) || clientNodes.containsKey(node.id()));
+ return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
}
/**
@@ -2535,8 +2619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (node.isDaemon())
return false;
- if (nearEnabled && cacheFilter.apply(node))
- return true;
+ if (CU.affinityNode(node, cacheFilter))
+ return nearEnabled;
Boolean near = clientNodes.get(node.id());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
index 9a81cd1..f1561bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.*;
import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
@@ -46,9 +45,6 @@ public class GridIndexingManager extends GridManagerAdapter<IndexingSpi> {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@Override public void start() throws IgniteCheckedException {
- if (!enabled())
- U.warn(log, "Indexing is disabled (to enable please configure GridIndexingSpi).");
-
startSpi();
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index e9df8b8..5373e46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -68,6 +68,18 @@ class GridAffinityAssignment implements Serializable {
}
/**
+ * @param topVer Topology version.
+ * @param aff Assignment to copy from.
+ */
+ GridAffinityAssignment(AffinityTopologyVersion topVer, GridAffinityAssignment aff) {
+ this.topVer = topVer;
+
+ assignment = aff.assignment;
+ primary = aff.primary;
+ backup = aff.backup;
+ }
+
+ /**
* @return Affinity assignment.
*/
public List<List<ClusterNode>> assignment() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index eccd9f9..c46490e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -32,6 +32,8 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
/**
* Affinity cached function.
*/
@@ -221,6 +223,35 @@ public class GridAffinityAssignmentCache {
}
/**
+ * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+ * (e.g. client node joins on leaves).
+ *
+ * @param evt Event.
+ * @param topVer Topology version.
+ */
+ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
+ GridAffinityAssignment aff = head.get();
+
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
+
+ GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
+
+ affCache.put(topVer, assignmentCpy);
+ head.set(assignmentCpy);
+
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing topology ready future (use previous affinity) " +
+ "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+
+ entry.getValue().onDone(topVer);
+ }
+ }
+ }
+
+ /**
* @return Last calculated affinity version.
*/
public AffinityTopologyVersion lastVersion() {
@@ -422,6 +453,7 @@ public class GridAffinityAssignmentCache {
/**
*
+ * @param reqTopVer Required topology version.
*/
private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) {
this.reqTopVer = reqTopVer;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index daa2bc2..aac63c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
*
* @param cacheName Cache name.
* @param key Key to map.
+ * @param topVer Topology version.
* @return Affinity nodes, primary first.
* @throws IgniteCheckedException If failed.
*/
- public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException {
+ public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName,
+ K key,
+ AffinityTopologyVersion topVer)
+ throws IgniteCheckedException
+ {
A.notNull(key, "key");
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
AffinityInfo affInfo = affinityCache(cacheName, topVer);
if (affInfo == null)
@@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
}
/**
+ * Map single key to primary and backup nodes.
+ *
+ * @param cacheName Cache name.
+ * @param key Key to map.
+ * @return Affinity nodes, primary first.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key)
+ throws IgniteCheckedException
+ {
+ return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx());
+ }
+
+ /**
* Gets affinity key for cache key.
*
* @param cacheName Cache name.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java
index 5d6062e..7a3fbee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+
import org.jetbrains.annotations.*;
import java.util.*;
@@ -91,6 +92,36 @@ public class CacheEvictableEntryImpl<K, V> implements EvictableEntry<K, V> {
}
/** {@inheritDoc} */
+ public int size() {
+ try {
+ GridCacheContext<Object, Object> cctx = cached.context();
+
+ KeyCacheObject key = cached.key();
+
+ byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
+
+ byte[] valBytes = null;
+
+ if (cctx.useOffheapEntry())
+ valBytes = cctx.offheap().get(cctx.swap().spaceName(), cached.partition(), key, keyBytes);
+ else {
+ CacheObject cacheObj = cached.valueBytes();
+
+ if (cacheObj != null)
+ valBytes = cacheObj.valueBytes(cctx.cacheObjectContext());
+ }
+
+ return valBytes == null ? keyBytes.length : keyBytes.length + valBytes.length;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ return 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public V getValue() {
try {