You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/01/16 15:21:02 UTC
incubator-ignite git commit: interop .Net: Merge from sp31 (without
.Net component).
Repository: incubator-ignite
Updated Branches:
refs/heads/master 725d79ff4 -> c0c28abdd
interop .Net: Merge from sp31 (without .Net component).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c0c28abd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c0c28abd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c0c28abd
Branch: refs/heads/master
Commit: c0c28abdd32c9fe63866f28044aecbe3386c868f
Parents: 725d79f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Jan 16 17:21:12 2015 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Jan 16 17:21:12 2015 +0400
----------------------------------------------------------------------
.../ignite/portables/PortableRawReader.java | 2 +-
.../grid/dotnet/GridDotNetConfiguration.java | 14 +++-
.../dotnet/GridDotNetPortableConfiguration.java | 28 +++-----
.../GridDotNetPortableTypeConfiguration.java | 15 +---
.../org/gridgain/grid/kernal/GridGainEx.java | 76 ++++++++++++++------
.../grid/kernal/GridNodeAttributes.java | 5 +-
.../processors/cache/GridCacheAdapter.java | 15 +++-
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheEntryImpl.java | 2 +-
.../GridCacheContinuousQueryAdapter.java | 13 +++-
.../GridCacheContinuousQueryFilterEx.java | 32 +++++++++
.../GridCacheContinuousQueryHandler.java | 17 ++++-
.../GridCacheContinuousQueryHandlerV3.java | 61 ++++++++++++++++
.../GridCacheContinuousQueryHandlerV4.java | 61 ++++++++++++++++
.../GridCacheContinuousQueryListener.java | 5 ++
.../GridCacheContinuousQueryManager.java | 55 ++++++++++----
.../interop/GridInteropProcessorAdapter.java | 3 -
17 files changed, 318 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java b/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java
index 67e9b13..5801aa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/portables/PortableRawReader.java
@@ -112,7 +112,7 @@ public interface PortableRawReader {
* @return Object.
* @throws PortableException In case of error.
*/
- @Nullable public Object readObject() throws PortableException;
+ @Nullable public <T> T readObject() throws PortableException;
/**
* @return Byte array.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java
index 07a1a0d..a8a595d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetConfiguration.java
@@ -41,7 +41,8 @@ public class GridDotNetConfiguration implements PortableMarshalAware {
/**
* Copy constructor.
- * @param cfg configuration to copy.
+ *
+ * @param cfg Configuration to copy.
*/
public GridDotNetConfiguration(GridDotNetConfiguration cfg) {
if (cfg.getPortableConfiguration() != null)
@@ -80,6 +81,15 @@ public class GridDotNetConfiguration implements PortableMarshalAware {
this.assemblies = assemblies;
}
+ /**
+ * Copy configuration.
+ *
+ * @return Copied configuration.
+ */
+ public GridDotNetConfiguration copy() {
+ return new GridDotNetConfiguration(this);
+ }
+
/** {@inheritDoc} */
@Override public void writePortable(PortableWriter writer) throws PortableException {
PortableRawWriter rawWriter = writer.rawWriter();
@@ -92,7 +102,7 @@ public class GridDotNetConfiguration implements PortableMarshalAware {
@Override public void readPortable(PortableReader reader) throws PortableException {
PortableRawReader rawReader = reader.rawReader();
- portableCfg = (GridDotNetPortableConfiguration)rawReader.readObject();
+ portableCfg = rawReader.readObject();
assemblies = (List<String>)rawReader.<String>readCollection();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java
index ec5fb06..6398d0b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableConfiguration.java
@@ -45,7 +45,7 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware {
private boolean dfltMetadataEnabled = true;
/** Whether to cache deserialized value in IGridPortableObject */
- private boolean keepDeserialized = true;
+ private boolean dfltKeepDeserialized = true;
/**
* Default constructor.
@@ -73,7 +73,7 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware {
dfltIdMapper = cfg.getDefaultIdMapper();
dfltSerializer = cfg.getDefaultSerializer();
dfltMetadataEnabled = cfg.getDefaultMetadataEnabled();
- keepDeserialized = cfg.getKeepDeserialized();
+ dfltKeepDeserialized = cfg.getDefaultKeepDeserialized();
}
/**
@@ -163,15 +163,15 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware {
/**
* @return Flag indicates whether to cache deserialized value in IGridPortableObject.
*/
- public boolean getKeepDeserialized() {
- return keepDeserialized;
+ public boolean getDefaultKeepDeserialized() {
+ return dfltKeepDeserialized;
}
/**
* @param keepDeserialized Keep deserialized flag.
*/
- public void setKeepDeserialized(boolean keepDeserialized) {
- this.keepDeserialized = keepDeserialized;
+ public void setDefaultKeepDeserialized(boolean keepDeserialized) {
+ this.dfltKeepDeserialized = keepDeserialized;
}
/** {@inheritDoc} */
@@ -179,18 +179,12 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware {
PortableRawWriter rawWriter = writer.rawWriter();
rawWriter.writeCollection(typesCfg);
-
rawWriter.writeCollection(types);
-
rawWriter.writeString(dfltNameMapper);
-
rawWriter.writeString(dfltIdMapper);
-
rawWriter.writeString(dfltSerializer);
-
rawWriter.writeBoolean(dfltMetadataEnabled);
-
- rawWriter.writeBoolean(keepDeserialized);
+ rawWriter.writeBoolean(dfltKeepDeserialized);
}
/** {@inheritDoc} */
@@ -198,18 +192,12 @@ public class GridDotNetPortableConfiguration implements PortableMarshalAware {
PortableRawReader rawReader = reader.rawReader();
typesCfg = rawReader.readCollection();
-
types = rawReader.readCollection();
-
dfltNameMapper = rawReader.readString();
-
dfltIdMapper = rawReader.readString();
-
dfltSerializer = rawReader.readString();
-
dfltMetadataEnabled = rawReader.readBoolean();
-
- keepDeserialized = rawReader.readBoolean();
+ dfltKeepDeserialized = rawReader.readBoolean();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java
index 18e5a06..7e72ff3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/dotnet/GridDotNetPortableTypeConfiguration.java
@@ -68,6 +68,7 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware
serializer = cfg.getSerializer();
affinityKeyFieldName = cfg.getAffinityKeyFieldName();
metadataEnabled = cfg.getMetadataEnabled();
+ keepDeserialized = cfg.isKeepDeserialized();
}
/**
@@ -187,19 +188,12 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware
PortableRawWriter rawWriter = writer.rawWriter();
rawWriter.writeString(assemblyName);
-
rawWriter.writeString(typeName);
-
rawWriter.writeString(nameMapper);
-
rawWriter.writeString(idMapper);
-
rawWriter.writeString(serializer);
-
rawWriter.writeString(affinityKeyFieldName);
-
rawWriter.writeObject(metadataEnabled);
-
rawWriter.writeObject(keepDeserialized);
}
@@ -208,19 +202,12 @@ public class GridDotNetPortableTypeConfiguration implements PortableMarshalAware
PortableRawReader rawReader = reader.rawReader();
assemblyName = rawReader.readString();
-
typeName = rawReader.readString();
-
nameMapper = rawReader.readString();
-
idMapper = rawReader.readString();
-
serializer = rawReader.readString();
-
affinityKeyFieldName = rawReader.readString();
-
metadataEnabled = (Boolean)rawReader.readObject();
-
keepDeserialized = (Boolean)rawReader.readObject();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java
index 5028599..81a8c1f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java
@@ -525,15 +525,15 @@ public class GridGainEx {
*
* @param springCfgPath Spring config path.
* @param gridName Grid name.
- * @param envPtr Environment pointer.
+ * @param cfgClo Configuration closure.
* @return Started Grid.
* @throws IgniteCheckedException If failed.
*/
- public static Ignite startInterop(@Nullable String springCfgPath, @Nullable String gridName, long envPtr)
- throws IgniteCheckedException {
- GridInteropProcessorAdapter.ENV_PTR.set(envPtr);
+ public static Ignite startInterop(@Nullable String springCfgPath, @Nullable String gridName,
+ IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo) throws IgniteCheckedException {
+ URL url = resolveSpringUrl(springCfgPath);
- return start(springCfgPath, gridName);
+ return start(url, gridName, null, cfgClo);
}
/**
@@ -654,21 +654,7 @@ public class GridGainEx {
*/
public static Ignite start(String springCfgPath, @Nullable String gridName,
@Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
- A.notNull(springCfgPath, "springCfgPath");
-
- URL url;
-
- try {
- url = new URL(springCfgPath);
- }
- catch (MalformedURLException e) {
- url = U.resolveGridGainUrl(springCfgPath);
-
- if (url == null)
- throw new IgniteCheckedException("Spring XML configuration path is invalid: " + springCfgPath +
- ". Note that this path should be either absolute or a relative local file system path, " +
- "relative to META-INF in classpath or valid URL to GRIDGAIN_HOME.", e);
- }
+ URL url = resolveSpringUrl(springCfgPath);
return start(url, gridName, springCtx);
}
@@ -716,6 +702,23 @@ public class GridGainEx {
*/
public static Ignite start(URL springCfgUrl, @Nullable String gridName,
@Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
+ return start(springCfgUrl, gridName, springCtx, null);
+ }
+
+ /**
+ * Internal Spring-based start routine.
+ *
+ * @param springCfgUrl Spring XML configuration file URL. This cannot be {@code null}.
+ * @param gridName Grid name that will override default.
+ * @param springCtx Optional Spring application context.
+ * @param cfgClo Optional closure to change configuration before it is used to start the grid.
+ * @return Started grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static Ignite start(URL springCfgUrl, @Nullable String gridName,
+ @Nullable GridSpringResourceContext springCtx,
+ @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
+ throws IgniteCheckedException {
A.notNull(springCfgUrl, "springCfgUrl");
boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
@@ -751,6 +754,12 @@ public class GridGainEx {
if (cfg.getGridName() == null && !F.isEmpty(gridName))
cfg.setGridName(gridName);
+ if (cfgClo != null) {
+ cfg = cfgClo.apply(cfg);
+
+ assert cfg != null;
+ }
+
// Use either user defined context or our one.
GridNamedInstance grid = start0(
new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx));
@@ -781,6 +790,33 @@ public class GridGainEx {
}
/**
+ * Resolve Spring configuration URL.
+ *
+ * @param springCfgPath Spring XML configuration file path or URL. This cannot be {@code null}.
+ * @return URL.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static URL resolveSpringUrl(String springCfgPath) throws IgniteCheckedException {
+ A.notNull(springCfgPath, "springCfgPath");
+
+ URL url;
+
+ try {
+ url = new URL(springCfgPath);
+ }
+ catch (MalformedURLException e) {
+ url = U.resolveGridGainUrl(springCfgPath);
+
+ if (url == null)
+ throw new IgniteCheckedException("Spring XML configuration path is invalid: " + springCfgPath +
+ ". Note that this path should be either absolute or a relative local file system path, " +
+ "relative to META-INF in classpath or valid URL to GRIDGAIN_HOME.", e);
+ }
+
+ return url;
+ }
+
+ /**
* Starts grid with given configuration.
*
* @param startCtx Start context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java
index f77be58..da818d4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridNodeAttributes.java
@@ -22,7 +22,7 @@ package org.gridgain.grid.kernal;
*/
public final class GridNodeAttributes {
/** Prefix for internally reserved attribute names. */
- static final String ATTR_PREFIX = "org.gridgain";
+ public static final String ATTR_PREFIX = "org.gridgain";
/** Node compound version. */
public static final String ATTR_BUILD_VER = ATTR_PREFIX + ".build.ver";
@@ -136,9 +136,6 @@ public final class GridNodeAttributes {
/** Cache interceptors. */
public static final String ATTR_CACHE_INTERCEPTORS = ATTR_PREFIX + ".cache.interceptors";
- /** Native platform. */
- public static final String ATTR_INTEROP_PLATFORM = ATTR_PREFIX + ".interop.platform";
-
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 97d914a..991cc74 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -399,7 +399,18 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public <K1, V1> GridCacheProjection<K1, V1> keepPortable() {
- GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>(
+ GridCacheProjectionImpl<K1, V1> prj = keepPortable0();
+
+ return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
+ }
+
+ /**
+ * Internal routine to get "keep-portable" projection.
+ *
+ * @return Projection with "keep-portable" flag.
+ */
+ public <K1, V1> GridCacheProjectionImpl<K1, V1> keepPortable0() {
+ return new GridCacheProjectionImpl<>(
(GridCacheProjection<K1, V1>)this,
(GridCacheContext<K1, V1>)ctx,
null,
@@ -407,8 +418,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
null,
null,
ctx.portableEnabled());
-
- return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 841f961..ff34474 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -1063,7 +1063,7 @@ public class GridCacheContext<K, V> implements Externalizable {
*
* @param prj Flags to set.
*/
- void projectionPerCall(@Nullable GridCacheProjectionImpl<K, V> prj) {
+ public void projectionPerCall(@Nullable GridCacheProjectionImpl<K, V> prj) {
if (nearContext())
dht().near().context().prjPerCall.set(prj);
else
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
index d27ac8f..adfab5e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
@@ -406,7 +406,7 @@ public class GridCacheEntryImpl<K, V> implements GridCacheEntry<K, V>, Externali
/** {@inheritDoc} */
@Nullable @Override public V get() throws IgniteCheckedException {
- return proxy.get(key, isNearEnabled(ctx) ? null : cached, true);
+ return proxy.get(key, isNearEnabled(ctx) ? null : cached, !ctx.keepPortable());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index 66defaa..e881aa9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache.query.continuous;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
@@ -56,6 +55,9 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
/** Projection predicate */
private final IgnitePredicate<GridCacheEntry<K, V>> prjPred;
+ /** Keep portable flag. */
+ private final boolean keepPortable;
+
/** Logger. */
private final IgniteLogger log;
@@ -98,6 +100,8 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
this.topic = topic;
this.prjPred = prjPred;
+ keepPortable = ctx.keepPortable();
+
log = ctx.logger(getClass());
}
@@ -279,9 +283,12 @@ public class GridCacheContinuousQueryAdapter<K, V> implements GridCacheContinuou
guard.block();
- GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ?
- new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal,
+ GridContinuousHandler hnd = ctx.kernalContext().security().enabled() ? keepPortable ?
+ new GridCacheContinuousQueryHandlerV4<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal,
ctx.kernalContext().job().currentTaskNameHash()) :
+ new GridCacheContinuousQueryHandlerV2<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal,
+ ctx.kernalContext().job().currentTaskNameHash()) : keepPortable ?
+ new GridCacheContinuousQueryHandlerV3<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal) :
new GridCacheContinuousQueryHandler<>(ctx.name(), topic, locCb, rmtFilter, prjPred, internal);
routineId = ctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
new file mode 100644
index 0000000..21266b7
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query.continuous;
+
+
+import org.apache.ignite.lang.*;
+
+/**
+ * Extended continuous query filter.
+ */
+public interface GridCacheContinuousQueryFilterEx<K, V> extends
+ IgnitePredicate<org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry<K, V>> {
+ /**
+ * Callback for query unregister event.
+ */
+ public void onQueryUnregister();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index 61f0098..a7ff429 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -77,6 +77,8 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
+ * Constructor.
+ *
* @param cacheName Cache name.
* @param topic Topic for ordered messages.
* @param cb Local callback.
@@ -214,6 +216,12 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
+ /** {@inheritDoc} */
+ @Override public void onUnregister() {
+ if (filter != null && filter instanceof GridCacheContinuousQueryFilterEx)
+ ((GridCacheContinuousQueryFilterEx)filter).onQueryUnregister();
+ }
+
private boolean checkProjection(GridCacheContinuousQueryEntry<K, V> e) {
GridCacheProjectionImpl.FullFilter<K, V> filter = (GridCacheProjectionImpl.FullFilter<K, V>)prjPred;
@@ -254,7 +262,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** {@inheritDoc} */
@Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
- manager(ctx).iterate(internal, routineId);
+ manager(ctx).iterate(internal, routineId, keepPortable());
}
/** {@inheritDoc} */
@@ -415,6 +423,13 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
+ * @return Keep portable flag.
+ */
+ protected boolean keepPortable() {
+ return false;
+ }
+
+ /**
* Deployable object.
*/
private static class DeployableObject implements Externalizable {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java
new file mode 100644
index 0000000..e008586
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV3.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query.continuous;
+
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Continuous query handler used when "keepPortable" flag is set.
+ */
+public class GridCacheContinuousQueryHandlerV3<K, V> extends GridCacheContinuousQueryHandler<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridCacheContinuousQueryHandlerV3() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param topic Topic for ordered messages.
+ * @param cb Local callback.
+ * @param filter Filter.
+ * @param prjPred Projection predicate.
+ * @param internal If {@code true} then query is notified about internal entries updates.
+ */
+ public GridCacheContinuousQueryHandlerV3(@Nullable String cacheName, Object topic,
+ IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb,
+ @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter,
+ @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal) {
+ super(cacheName, topic, cb, filter, prjPred, internal);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean keepPortable() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java
new file mode 100644
index 0000000..a183cce
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandlerV4.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gridgain.grid.kernal.processors.cache.query.continuous;
+
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.query.GridCacheContinuousQueryEntry;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Continuous query handler used when "keepPortable" flag is set and security is enabled.
+ */
+public class GridCacheContinuousQueryHandlerV4<K, V> extends GridCacheContinuousQueryHandlerV2<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridCacheContinuousQueryHandlerV4() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param topic Topic for ordered messages.
+ * @param cb Local callback.
+ * @param filter Filter.
+ * @param prjPred Projection predicate.
+ * @param internal If {@code true} then query is notified about internal entries updates.
+ * @param taskHash Task hash.
+ */
+ public GridCacheContinuousQueryHandlerV4(@Nullable String cacheName, Object topic,
+ IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry<K, V>>> cb,
+ @Nullable IgnitePredicate<GridCacheContinuousQueryEntry<K, V>> filter,
+ @Nullable IgnitePredicate<GridCacheEntry<K, V>> prjPred, boolean internal, int taskHash) {
+ super(cacheName, topic, cb, filter, prjPred, internal, taskHash);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean keepPortable() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
index 461fc0e..a0cf134 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java
@@ -33,4 +33,9 @@ interface GridCacheContinuousQueryListener<K, V> {
* @param recordEvt Whether to record event.
*/
public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt);
+
+ /**
+ * Listener unregistered callback.
+ */
+ public void onUnregister();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
index ac5bce0..2d8e106 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java
@@ -129,6 +129,7 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
* @param internal Internal flag.
* @return Whether listener was actually registered.
*/
+ @SuppressWarnings("UnusedParameters")
boolean registerListener(UUID nodeId, UUID lsnrId, GridCacheContinuousQueryListener<K, V> lsnr, boolean internal) {
ListenerInfo<K, V> info = new ListenerInfo<>(lsnr);
@@ -158,13 +159,21 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
* @param id Listener ID.
*/
void unregisterListener(boolean internal, UUID id) {
+ ListenerInfo info;
+
if (internal) {
- if (intLsnrs.remove(id) != null)
+ if ((info = intLsnrs.remove(id)) != null) {
intLsnrCnt.decrementAndGet();
+
+ info.lsnr.onUnregister();
+ }
}
else {
- if (lsnrs.remove(id) != null)
+ if ((info = lsnrs.remove(id)) != null) {
lsnrCnt.decrementAndGet();
+
+ info.lsnr.onUnregister();
+ }
}
}
@@ -173,27 +182,43 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt
*
* @param internal Internal flag.
* @param id Listener ID.
+ * @param keepPortable Keep portable flag.
*/
- void iterate(boolean internal, UUID id) {
+ @SuppressWarnings("unchecked")
+ void iterate(boolean internal, UUID id, boolean keepPortable) {
ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id);
assert info != null;
- Set<GridCacheEntry<K, V>> entries;
+ GridCacheProjectionImpl<K, V> oldPrj = null;
- if (cctx.isReplicated())
- entries = internal ? cctx.cache().entrySetx() :
- cctx.cache().entrySet();
- else
- entries = internal ? cctx.cache().primaryEntrySetx() :
- cctx.cache().primaryEntrySet();
+ try {
+ if (keepPortable) {
+ oldPrj = cctx.projectionPerCall();
- for (GridCacheEntry<K, V> e : entries) {
- info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null),
- !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
- }
+ cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0());
+ }
+
+ Set<GridCacheEntry<K, V>> entries;
- info.flushPending();
+ if (cctx.isReplicated())
+ entries = internal ? cctx.cache().entrySetx() :
+ cctx.cache().entrySet();
+ else
+ entries = internal ? cctx.cache().primaryEntrySetx() :
+ cctx.cache().primaryEntrySet();
+
+ for (GridCacheEntry<K, V> e : entries) {
+ info.onIterate(new GridCacheContinuousQueryEntry<>(cctx, e, e.getKey(), e.getValue(), null, null, null),
+ !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ));
+ }
+
+ info.flushPending();
+ }
+ finally {
+ if (keepPortable)
+ cctx.projectionPerCall(oldPrj);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0c28abd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java
index f2d76f5..e86826e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropProcessorAdapter.java
@@ -24,9 +24,6 @@ import org.gridgain.grid.kernal.processors.*;
* Interop processor adapter.
*/
public abstract class GridInteropProcessorAdapter extends GridProcessorAdapter implements GridInteropProcessor {
- /** Managed environment pointer. */
- public static final ThreadLocal<Long> ENV_PTR = new ThreadLocal<>();
-
/** {@inheritDoc} */
protected GridInteropProcessorAdapter(GridKernalContext ctx) {
super(ctx);