You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/04 09:06:07 UTC
[01/10] ignite git commit: IGNITE-2621: Correct handling for tasks in
mixed-platform cluster. This closes #505.
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 4667293cf -> 4b9682c18
IGNITE-2621: Correct handling for tasks in mixed-platform cluster. This closes #505.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d38eb8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d38eb8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d38eb8f
Branch: refs/heads/ignite-1232
Commit: 2d38eb8fe2567bba86a736a6cec7bb85808d1d60
Parents: ee01b61
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Mar 3 09:52:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 09:52:13 2016 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 9 +-
.../platform/PlatformContextImpl.java | 13 +-
.../platform/PlatformProcessorImpl.java | 14 +--
.../platform/compute/PlatformCompute.java | 23 +++-
.../platform/PlatformStartIgniteTask.java | 77 ++++++++++++
.../ignite/platform/PlatformStopIgniteTask.java | 74 +++++++++++
.../Apache.Ignite.Core.Tests.csproj | 1 +
.../Compute/MixedClusterTest.cs | 123 +++++++++++++++++++
.../Config/Compute/compute-grid1.xml | 1 -
.../Config/Compute/compute-grid2.xml | 15 ++-
.../IgniteConfigurationTest.cs | 17 +--
.../Apache.Ignite.Core.Tests/TestUtils.cs | 33 ++++-
12 files changed, 365 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index b05d331..e88d57b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -272,4 +272,11 @@ public interface PlatformContext {
* @return Cluster node filter.
*/
public PlatformClusterNodeFilter createClusterNodeFilter(Object filter);
-}
+
+ /**
+ * Gets the current platform name.
+ *
+ * @return Current platform name.
+ */
+ public String platform();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
index c531718..b45414a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
@@ -108,6 +108,9 @@ public class PlatformContextImpl implements PlatformContext {
/** Node ids that has been sent to native platform. */
private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
+ /** Platform name. */
+ private final String platform;
+
/**
* Static initializer.
*/
@@ -142,11 +145,14 @@ public class PlatformContextImpl implements PlatformContext {
* @param ctx Kernal context.
* @param gate Callback gateway.
* @param mem Memory manager.
+ * @param platform Platform name.
*/
- public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem) {
+ public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem,
+ String platform) {
this.ctx = ctx;
this.gate = gate;
this.mem = mem;
+ this.platform = platform;
cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
@@ -621,6 +627,11 @@ public class PlatformContextImpl implements PlatformContext {
return new PlatformClusterNodeFilterImpl(filter, this);
}
+ /** {@inheritDoc} */
+ @Override public String platform() {
+ return platform;
+ }
+
/**
* Metadata holder.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index 76967ff..95daa4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -26,11 +26,9 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.PlatformConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteComputeImpl;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
@@ -122,7 +120,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
U.warn(log, w);
}
- platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory());
+ platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory(), interopCfg.platform());
}
/** {@inheritDoc} */
@@ -209,12 +207,12 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- public void releaseStart() {
+ @Override public void releaseStart() {
startLatch.countDown();
}
/** {@inheritDoc} */
- public void awaitStart() throws IgniteCheckedException {
+ @Override public void awaitStart() throws IgniteCheckedException {
U.await(startLatch);
}
@@ -305,9 +303,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
@Override public PlatformTarget compute(PlatformTarget grp) {
PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
- assert grp0.projection() instanceof ClusterGroupAdapter; // Safety for very complex ClusterGroup hierarchy.
-
- return new PlatformCompute(platformCtx, (IgniteComputeImpl)((ClusterGroupAdapter)grp0.projection()).compute());
+ return new PlatformCompute(platformCtx, grp0.projection());
}
/** {@inheritDoc} */
@@ -343,7 +339,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
try {
if (stopped)
- throw new IgniteCheckedException("Failed to initialize interop store becuase node is stopping: " +
+ throw new IgniteCheckedException("Failed to initialize interop store because node is stopping: " +
store);
if (started)
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 10545d5..a1a82ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.compute;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.IgniteComputeImpl;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
@@ -65,18 +67,25 @@ public class PlatformCompute extends PlatformAbstractTarget {
/** Compute instance. */
private final IgniteComputeImpl compute;
+ /** Compute instance for platform-only nodes. */
+ private final IgniteComputeImpl computeForPlatform;
+
/** Future for previous asynchronous operation. */
protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>();
/**
* Constructor.
*
* @param platformCtx Context.
- * @param compute Compute instance.
+ * @param grp Cluster group.
*/
- public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
+ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp) {
super(platformCtx);
- this.compute = compute;
+ compute = (IgniteComputeImpl)grp.ignite().compute(grp);
+
+ ClusterGroup platformGrp = grp.forAttribute(PlatformUtils.ATTR_PLATFORM, platformCtx.platform());
+
+ computeForPlatform = (IgniteComputeImpl)grp.ignite().compute(platformGrp);
}
/** {@inheritDoc} */
@@ -153,7 +162,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
((PlatformBalancingMultiClosureTask)task).jobs(jobs);
}
- platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
+ platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, computeForPlatform.clusterGroup().nodes());
return executeNative0(task);
}
@@ -195,7 +204,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
* @param topVer Topology version.
*/
public PlatformListenable executeNative(long taskPtr, long topVer) {
- final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
+ final PlatformFullTask task = new PlatformFullTask(platformCtx, computeForPlatform, taskPtr, topVer);
return executeNative0(task);
}
@@ -207,6 +216,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
*/
public void withTimeout(long timeout) {
compute.withTimeout(timeout);
+ computeForPlatform.withTimeout(timeout);
}
/**
@@ -214,6 +224,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
*/
public void withNoFailover() {
compute.withNoFailover();
+ computeForPlatform.withNoFailover();
}
/** <inheritDoc /> */
@@ -232,7 +243,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
* @param task Task.
*/
private PlatformListenable executeNative0(final PlatformAbstractTask task) {
- IgniteInternalFuture fut = compute.executeAsync(task, null);
+ IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
new file mode 100644
index 0000000..c8eb13f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to start an Ignite node.
+ */
+public class PlatformStartIgniteTask extends ComputeTaskAdapter<String, String> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable String arg) throws IgniteException {
+ return Collections.singletonMap(new PlatformStartIgniteJob(arg), F.first(subgrid));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String reduce(List<ComputeJobResult> results) throws IgniteException {
+ ComputeJobResult res = results.get(0);
+
+ if (res.getException() != null)
+ throw res.getException();
+ else
+ return results.get(0).getData();
+ }
+
+ /**
+ * Job.
+ */
+ private static class PlatformStartIgniteJob extends ComputeJobAdapter {
+ /** */
+ private final String springConfig;
+
+ /**
+ * Ctor.
+ * @param springConfig Config.
+ */
+ private PlatformStartIgniteJob(String springConfig) {
+ this.springConfig = springConfig;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ Ignite ignite = Ignition.start(springConfig);
+
+ return ignite.name();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
new file mode 100644
index 0000000..c0319ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to stop an Ignite node.
+ */
+public class PlatformStopIgniteTask extends ComputeTaskAdapter<String, Boolean> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable String arg) throws IgniteException {
+ return Collections.singletonMap(new PlatformStopIgniteJob(arg), F.first(subgrid));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ ComputeJobResult res = results.get(0);
+
+ if (res.getException() != null)
+ throw res.getException();
+ else
+ return results.get(0).getData();
+ }
+
+ /**
+ * Job.
+ */
+ private static class PlatformStopIgniteJob extends ComputeJobAdapter {
+ /** */
+ private final String gridName;
+
+ /**
+ * Ctor.
+ * @param gridName Name.
+ */
+ private PlatformStopIgniteJob(String gridName) {
+ this.gridName = gridName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ return Ignition.stop(gridName, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 4ba05e1..e72fb85 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -102,6 +102,7 @@
<Compile Include="Compute\FailoverTaskSelfTest.cs" />
<Compile Include="Compute\BinarizableClosureTaskTest.cs" />
<Compile Include="Compute\BinarizableTaskTest.cs" />
+ <Compile Include="Compute\MixedClusterTest.cs" />
<Compile Include="Compute\ResourceTaskTest.cs" />
<Compile Include="Compute\SerializableClosureTaskTest.cs" />
<Compile Include="Compute\TaskAdapterTest.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
new file mode 100644
index 0000000..57ea892
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma warning disable 618 // SpringConfigUrl
+namespace Apache.Ignite.Core.Tests.Compute
+{
+ using System;
+ using System.Collections;
+ using System.Linq;
+ using Apache.Ignite.Core.Compute;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests compute in a cluster with Java-only and .NET nodes.
+ /// </summary>
+ public class MixedClusterTest
+ {
+ /** */
+ private const string SpringConfig = @"Config\Compute\compute-grid1.xml";
+
+ /** */
+ private const string SpringConfig2 = @"Config\Compute\compute-grid2.xml";
+
+ /** */
+ private const string StartTask = "org.apache.ignite.platform.PlatformStartIgniteTask";
+
+ /** */
+ private const string StopTask = "org.apache.ignite.platform.PlatformStopIgniteTask";
+
+ /// <summary>
+ /// Tests the compute.
+ /// </summary>
+ [Test]
+ public void TestCompute()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) {SpringConfigUrl = SpringConfig};
+
+ using (var ignite = Ignition.Start(cfg))
+ {
+ var javaNodeName = StartJavaNode(ignite, SpringConfig2);
+
+ try
+ {
+ Assert.IsTrue(ignite.WaitTopology(2));
+
+ TestDotNetTask(ignite);
+ TestJavaTask(ignite);
+ }
+ finally
+ {
+ StopJavaNode(ignite, javaNodeName);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests the dot net task.
+ /// </summary>
+ /// <param name="ignite">The ignite.</param>
+ private static void TestDotNetTask(IIgnite ignite)
+ {
+ var results = ignite.GetCompute().Broadcast(new ComputeFunc());
+
+ // There are two nodes, but only one can execute .NET jobs.
+ Assert.AreEqual(new[] {int.MaxValue}, results.ToArray());
+ }
+
+ /// <summary>
+ /// Tests the dot net task.
+ /// </summary>
+ /// <param name="ignite">The ignite.</param>
+ private static void TestJavaTask(IIgnite ignite)
+ {
+ // Java task can execute on both nodes.
+ var res = ignite.GetCompute().ExecuteJavaTask<ICollection>(ComputeApiTest.BroadcastTask, null);
+
+ Assert.AreEqual(2, res.Count);
+ }
+
+ /// <summary>
+ /// Starts the java node.
+ /// </summary>
+ private static string StartJavaNode(IIgnite grid, string config)
+ {
+ return grid.GetCompute().ExecuteJavaTask<string>(StartTask, config);
+ }
+
+ /// <summary>
+ /// Stops the java node.
+ /// </summary>
+ private static void StopJavaNode(IIgnite grid, string name)
+ {
+ grid.GetCompute().ExecuteJavaTask<object>(StopTask, name);
+ }
+
+ /// <summary>
+ /// Test func.
+ /// </summary>
+ [Serializable]
+ private class ComputeFunc : IComputeFunc<int>
+ {
+ /** <inheritdoc /> */
+ public int Invoke()
+ {
+ return int.MaxValue;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
index 3061773..78a30a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
@@ -69,7 +69,6 @@
</bean>
</list>
</property>
-
</bean>
</property>
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
index ef29a89..b1e8235 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
@@ -62,10 +62,23 @@
</bean>
</list>
</property>
-
+ <property name="idMapper">
+ <bean class="org.apache.ignite.binary.BinaryBasicIdMapper">
+ <constructor-arg value="true"/>
+ </bean>
+ </property>
+ <property name="nameMapper">
+ <bean class="org.apache.ignite.binary.BinaryBasicNameMapper">
+ <constructor-arg value="true"/>
+ </bean>
+ </property>
</bean>
</property>
+ <property name="marshaller">
+ <bean class="org.apache.ignite.internal.binary.BinaryMarshaller" />
+ </property>
+
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index 15f5804..3aa26d8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Tests
using System.Linq;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Common;
- using Apache.Ignite.Core.Discovery;
using Apache.Ignite.Core.Discovery.Tcp;
using Apache.Ignite.Core.Discovery.Tcp.Multicast;
using Apache.Ignite.Core.Discovery.Tcp.Static;
@@ -138,12 +137,12 @@ namespace Apache.Ignite.Core.Tests
using (var ignite = Ignition.Start(new IgniteConfiguration
{
Localhost = "127.0.0.1",
- DiscoverySpi = GetStaticDiscovery()
+ DiscoverySpi = TestUtils.GetStaticDiscovery()
}))
using (var ignite2 = Ignition.Start(new IgniteConfiguration
{
Localhost = "127.0.0.1",
- DiscoverySpi = GetStaticDiscovery(),
+ DiscoverySpi = TestUtils.GetStaticDiscovery(),
GridName = "client",
ClientMode = true
}))
@@ -351,17 +350,5 @@ namespace Apache.Ignite.Core.Tests
Localhost = "127.0.0.1"
};
}
-
- /// <summary>
- /// Gets the static discovery.
- /// </summary>
- /// <returns></returns>
- private static IDiscoverySpi GetStaticDiscovery()
- {
- return new TcpDiscoverySpi
- {
- IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new[] {"127.0.0.1:47500", "127.0.0.1:47501"}}
- };
- }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
index f972cf7..9fe8f1c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
@@ -22,6 +22,9 @@ namespace Apache.Ignite.Core.Tests
using System.Collections.Generic;
using System.Linq;
using System.Threading;
+ using Apache.Ignite.Core.Discovery;
+ using Apache.Ignite.Core.Discovery.Tcp;
+ using Apache.Ignite.Core.Discovery.Tcp.Static;
using Apache.Ignite.Core.Impl;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Tests.Process;
@@ -212,7 +215,7 @@ namespace Apache.Ignite.Core.Tests
/// <returns>
/// <c>True</c> if topology took required size.
/// </returns>
- public static bool WaitTopology(this IIgnite grid, int size, int timeout)
+ public static bool WaitTopology(this IIgnite grid, int size, int timeout = 30000)
{
int left = timeout;
@@ -302,5 +305,33 @@ namespace Apache.Ignite.Core.Tests
return false;
}
+
+ /// <summary>
+ /// Gets the static discovery.
+ /// </summary>
+ public static IDiscoverySpi GetStaticDiscovery()
+ {
+ return new TcpDiscoverySpi
+ {
+ IpFinder = new TcpDiscoveryStaticIpFinder
+ {
+ Endpoints = new[] { "127.0.0.1:47500", "127.0.0.1:47501" }
+ }
+ };
+ }
+
+ /// <summary>
+ /// Gets the default code-based test configuration.
+ /// </summary>
+ public static IgniteConfiguration GetTestConfiguration()
+ {
+ return new IgniteConfiguration
+ {
+ DiscoverySpi = GetStaticDiscovery(),
+ Localhost = "127.0.0.1",
+ JvmOptions = TestJavaOptions(),
+ JvmClasspath = CreateTestClasspath()
+ };
+ }
}
}
[04/10] ignite git commit: IGFS: Minor refactoring.
Posted by sb...@apache.org.
IGFS: Minor refactoring.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0e85fda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0e85fda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0e85fda
Branch: refs/heads/ignite-1232
Commit: b0e85fdab48738b5db84554b52ae7978aa87ed95
Parents: cba4f4c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 12:56:55 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 12:56:55 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 55 +++++++++++---------
1 file changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0e85fda/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index a149b31..0ba78c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -480,7 +480,7 @@ public class IgfsMetaManager extends IgfsManager {
assert validTxState(false);
assert fileId != null;
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
// Lock file ID for this transaction.
@@ -494,7 +494,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock);
- boolean put = metaCache.replace(fileId, oldInfo, newInfo);
+ boolean put = id2InfoPrj.replace(fileId, oldInfo, newInfo);
assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
@@ -571,7 +571,7 @@ public class IgfsMetaManager extends IgfsManager {
final boolean interrupted = Thread.interrupted();
try {
- IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
+ IgfsUtils.doInTransactionWithRetries(id2InfoPrj, new IgniteOutClosureX<Void>() {
@Override public Void applyx() throws IgniteCheckedException {
assert validTxState(true);
@@ -591,7 +591,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
- boolean put = metaCache.put(fileId, newInfo);
+ boolean put = id2InfoPrj.put(fileId, newInfo);
assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo
+ ']';
@@ -847,8 +847,6 @@ public class IgfsMetaManager extends IgfsManager {
if (!id2InfoPrj.putIfAbsent(fileId, newFileInfo))
throw fsException("Failed to add file details into cache: " + newFileInfo);
- assert metaCache.get(parentId) != null;
-
id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new IgfsListingEntry(newFileInfo), false));
return null;
@@ -890,7 +888,7 @@ public class IgfsMetaManager extends IgfsManager {
}
// 2. Start transaction.
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
// 3. Obtain the locks.
@@ -1094,9 +1092,6 @@ public class IgfsMetaManager extends IgfsManager {
" directory (file already exists) [fileId=" + fileId + ", destFileName=" + destFileName +
", destParentId=" + destParentId + ", destEntry=" + destEntry + ']'));
- assert metaCache.get(srcParentId) != null;
- assert metaCache.get(destParentId) != null;
-
// Remove listing entry from the source parent listing.
id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
@@ -1116,7 +1111,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
assert validTxState(false);
- final IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ final IgniteInternalTx tx = startTx();
try {
// NB: We may lock root because its id is less than any other id:
@@ -1197,7 +1192,7 @@ public class IgfsMetaManager extends IgfsManager {
boolean added = allIds.add(TRASH_ID);
assert added;
- final IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ final IgniteInternalTx tx = startTx();
try {
final Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(allIds);
@@ -1360,7 +1355,7 @@ public class IgfsMetaManager extends IgfsManager {
assert listing != null;
assert validTxState(false);
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
Collection<IgniteUuid> res = new HashSet<>();
@@ -1449,7 +1444,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
assert validTxState(false);
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
boolean res = false;
@@ -1630,7 +1625,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
assert validTxState(false);
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
IgfsFileInfo info = updatePropertiesNonTx(parentId, fileId, fileName, props);
@@ -1700,8 +1695,7 @@ public class IgfsMetaManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']');
- IgniteInternalTx tx = metaCache.isLockedByThread(fileId) ? null : metaCache.txStartEx(PESSIMISTIC,
- REPEATABLE_READ);
+ IgniteInternalTx tx = id2InfoPrj.isLockedByThread(fileId) ? null : startTx();
try {
// Lock file ID for this transaction.
@@ -1724,7 +1718,7 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException("Failed to update file info (file types differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
- boolean b = metaCache.replace(fileId, oldInfo, newInfo);
+ boolean b = id2InfoPrj.replace(fileId, oldInfo, newInfo);
assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo +
", c=" + c + ']';
@@ -1771,7 +1765,7 @@ public class IgfsMetaManager extends IgfsManager {
b = new DirectoryChainBuilder(path, props, props);
// Start TX.
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
@@ -1856,7 +1850,7 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
@@ -2120,7 +2114,7 @@ public class IgfsMetaManager extends IgfsManager {
assert lockedInfo != null; // We checked the lock above.
- boolean put = metaCache.put(info.id(), lockedInfo);
+ boolean put = id2InfoPrj.put(info.id(), lockedInfo);
assert put;
@@ -2707,7 +2701,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.add(fileIds(path));
// Start pessimistic.
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
// Lock the very first existing parents and possibly the leaf as well.
@@ -2909,16 +2903,25 @@ public class IgfsMetaManager extends IgfsManager {
* @return Transaction state is correct.
*/
private boolean validTxState(boolean inTx) {
- boolean txState = inTx == (metaCache.tx() != null);
+ boolean txState = inTx == (id2InfoPrj.tx() != null);
assert txState : (inTx ? "Method cannot be called outside transaction " :
- "Method cannot be called in transaction ") + "[tx=" + metaCache.tx() + ", threadId=" +
+ "Method cannot be called in transaction ") + "[tx=" + id2InfoPrj.tx() + ", threadId=" +
Thread.currentThread().getId() + ']';
return txState;
}
/**
+ * Start transaction on meta cache.
+ *
+ * @return Transaction.
+ */
+ private IgniteInternalTx startTx() {
+ return metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
* Updates last access and last modification times.
*
* @param parentId File parent ID.
@@ -2935,7 +2938,7 @@ public class IgfsMetaManager extends IgfsManager {
assert validTxState(false);
// Start pessimistic transaction.
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(fileId, parentId);
@@ -3401,7 +3404,7 @@ public class IgfsMetaManager extends IgfsManager {
};
// Start Tx:
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ IgniteInternalTx tx = startTx();
try {
if (overwrite)
[07/10] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead
of factory when continuous query is created".
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 62ed66f..cdf4ffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -28,11 +28,14 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
@@ -40,7 +43,9 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -55,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -132,6 +138,51 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
* @throws Exception If failed.
*/
+ public void testFilterAndFactoryProvided() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+ try {
+ final ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() {
+ @Override public CacheEntryEventFilter create() {
+ return null;
+ }
+ });
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter() {
+ @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+ return false;
+ }
+ });
+
+ qry.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return cache.query(qry);
+ }
+ }, IgniteException.class, null);
+
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicClient() throws Exception {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
1,
@@ -576,7 +627,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
* @param deploy The place where continuous query will be started.
* @throws Exception If failed.
*/
- private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
throws Exception {
ignite(0).createCache(ccfg);
@@ -1124,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
* @param store If {@code true} configures dummy cache store.
* @return Cache configuration.
*/
- private CacheConfiguration<Object, Object> cacheConfiguration(
+ protected CacheConfiguration<Object, Object> cacheConfiguration(
CacheMode cacheMode,
int backups,
CacheAtomicityMode atomicityMode,
@@ -1176,7 +1227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
*
*/
- static class QueryTestKey implements Serializable, Comparable {
+ public static class QueryTestKey implements Serializable, Comparable {
/** */
private final Integer key;
@@ -1219,12 +1270,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
*
*/
- static class QueryTestValue implements Serializable {
+ public static class QueryTestValue implements Serializable {
/** */
- private final Integer val1;
+ protected final Integer val1;
/** */
- private final String val2;
+ protected final String val2;
/**
* @param val Value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
new file mode 100644
index 0000000..359dd58
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Event filter for deployment.
+ */
+public class CacheDeploymentEntryEventFilter implements CacheEntryEventFilter<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+ throws CacheEntryListenerException {
+ return evt.getValue() == null || evt.getValue() % 2 != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
new file mode 100644
index 0000000..0d6eceb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Event filter factory for deployment.
+ */
+public class CacheDeploymentEntryEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<Integer, Integer> create() {
+ return new CacheDeploymentEntryEventFilter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 968dbf6..083af1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -79,12 +79,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -225,6 +227,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+ suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+ suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
suite.addTestSuite(CacheContinuousBatchAckTest.class);
suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
[08/10] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead
of factory when continuous query is created".
Posted by sb...@apache.org.
Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baa13122
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baa13122
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baa13122
Branch: refs/heads/ignite-1232
Commit: baa131220bf503da0908e4ecfee92966317e209c
Parents: c13339f
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 3 16:21:53 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 3 16:21:53 2016 +0300
----------------------------------------------------------------------
.../ignite/cache/query/ContinuousQuery.java | 35 +
.../processors/cache/IgniteCacheProxy.java | 4 +
.../continuous/CacheContinuousQueryHandler.java | 86 ++-
.../CacheContinuousQueryHandlerV2.java | 176 +++++
.../continuous/CacheContinuousQueryManager.java | 238 +++++--
.../continuous/GridContinuousProcessor.java | 7 +-
.../IgniteCacheEntryListenerAbstractTest.java | 75 +-
.../cache/IgniteCacheEntryListenerTxTest.java | 5 -
.../GridCacheReplicatedPreloadSelfTest.java | 39 +-
.../CacheContinuousQueryFactoryFilterTest.java | 714 +++++++++++++++++++
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../CacheContinuousQueryOperationP2PTest.java | 326 +++++++++
...acheContinuousQueryRandomOperationsTest.java | 63 +-
.../p2p/CacheDeploymentEntryEventFilter.java | 33 +
.../CacheDeploymentEntryEventFilterFactory.java | 31 +
.../IgniteCacheQuerySelfTestSuite.java | 4 +
16 files changed, 1706 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index df1bad3..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -18,6 +18,8 @@
package org.apache.ignite.cache.query;
import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -119,6 +121,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** Remote filter. */
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
+ /** Remote filter factory. */
+ private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
/** Time interval. */
private long timeInterval = DFLT_TIME_INTERVAL;
@@ -196,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @param rmtFilter Key-value filter.
* @return {@code this} for chaining.
+ *
+ * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
*/
+ @Deprecated
public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
this.rmtFilter = rmtFilter;
@@ -213,6 +221,33 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
}
/**
+ * Sets optional key-value filter factory. This factory produces filter is called before entry is
+ * sent to the master node.
+ * <p>
+ * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+ * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+ * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+ *
+ * @param rmtFilterFactory Key-value filter factory.
+ * @return {@code this} for chaining.
+ */
+ public ContinuousQuery<K, V> setRemoteFilterFactory(
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+ this.rmtFilterFactory = rmtFilterFactory;
+
+ return this;
+ }
+
+ /**
+ * Gets remote filter.
+ *
+ * @return Remote filter.
+ */
+ public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
+ return rmtFilterFactory;
+ }
+
+ /**
* Sets time interval.
* <p>
* When a cache update happens, entry is first put into a buffer. Entries from buffer will
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5ed8753..6e8bcbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -565,10 +565,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (qry.getLocalListener() == null)
throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+ if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+ throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
try {
final UUID routineId = ctx.continuousQueries().executeQuery(
qry.getLocalListener(),
qry.getRemoteFilter(),
+ qry.getRemoteFilterFactory(),
qry.getPageSize(),
qry.getTimeInterval(),
qry.isAutoUnsubscribe(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1938edb..10fbd89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
@@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param topic Topic for ordered messages.
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
- * @param internal Internal flag.
- * @param notifyExisting Notify existing flag.
* @param oldValRequired Old value required flag.
* @param sync Synchronous flag.
* @param ignoreExpired Ignore expired events flag.
- * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
- * @param taskHash Task name hash code.
- * @param locCache {@code True} if local cache.
- * @param keepBinary Keep binary flag.
*/
public CacheContinuousQueryHandler(
String cacheName,
Object topic,
CacheEntryUpdatedListener<K, V> locLsnr,
CacheEntryEventSerializableFilter<K, V> rmtFilter,
- boolean internal,
- boolean notifyExisting,
boolean oldValRequired,
boolean sync,
boolean ignoreExpired,
- int taskHash,
- boolean skipPrimaryCheck,
- boolean locCache,
- boolean keepBinary,
boolean ignoreClsNotFound) {
assert topic != null;
assert locLsnr != null;
@@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
this.topic = topic;
this.locLsnr = locLsnr;
this.rmtFilter = rmtFilter;
- this.internal = internal;
- this.notifyExisting = notifyExisting;
this.oldValRequired = oldValRequired;
this.sync = sync;
this.ignoreExpired = ignoreExpired;
- this.taskHash = taskHash;
- this.skipPrimaryCheck = skipPrimaryCheck;
- this.locCache = locCache;
- this.keepBinary = keepBinary;
this.ignoreClsNotFound = ignoreClsNotFound;
cacheId = CU.cacheId(cacheName);
}
+ /**
+ * @param internal Internal query.
+ */
+ public void internal(boolean internal) {
+ this.internal = internal;
+ }
+
+ /**
+ * @param notifyExisting Notify existing.
+ */
+ public void notifyExisting(boolean notifyExisting) {
+ this.notifyExisting = notifyExisting;
+ }
+
+ /**
+ * @param locCache Local cache.
+ */
+ public void localCache(boolean locCache) {
+ this.locCache = locCache;
+ }
+
+ /**
+ * @param taskHash Task hash.
+ */
+ public void taskNameHash(int taskHash) {
+ this.taskHash = taskHash;
+ }
+
+ /**
+ * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+ */
+ public void skipPrimaryCheck(boolean skipPrimaryCheck) {
+ this.skipPrimaryCheck = skipPrimaryCheck;
+ }
+
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
@@ -262,8 +279,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (locLsnr != null)
ctx.resource().injectGeneric(locLsnr);
- if (rmtFilter != null)
- ctx.resource().injectGeneric(rmtFilter);
+ final CacheEntryEventFilter filter = getEventFilter();
+
+ if (filter != null)
+ ctx.resource().injectGeneric(filter);
entryBufs = new ConcurrentHashMap<>();
@@ -303,7 +322,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName()
@@ -332,9 +352,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean notify = !evt.entry().isFiltered();
- if (notify && rmtFilter != null) {
+ if (notify && filter != null) {
try {
- notify = rmtFilter.evaluate(evt);
+ notify = filter.evaluate(evt);
}
catch (Exception e) {
U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +442,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName(),
@@ -435,8 +456,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onUnregister() {
- if (rmtFilter instanceof PlatformContinuousQueryFilter)
- ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+ if (filter instanceof PlatformContinuousQueryFilter)
+ ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +538,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ * @return Cache entry event filter.
+ */
+ public CacheEntryEventFilter getEventFilter() {
+ return rmtFilter;
+ }
+
+ /**
* @param cctx Context.
* @param nodeId ID of the node that started routine.
* @param entry Entry.
@@ -1189,7 +1217,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* Deployable object.
*/
- private static class DeployableObject implements Externalizable {
+ protected static class DeployableObject implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -1214,7 +1242,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
- private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+ protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
assert obj != null;
assert ctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
new file mode 100644
index 0000000..7aef4dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V2 version. Contains {@link Factory} for remote listener.
+ */
+public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Remote filter factory. */
+ private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+ /** Deployable object for filter factory. */
+ private DeployableObject rmtFilterFactoryDep;
+
+ /** Event types for JCache API. */
+ private byte types;
+
+ /** */
+ protected transient CacheEntryEventFilter filter;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public CacheContinuousQueryHandlerV2() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheName Cache name.
+ * @param topic Topic for ordered messages.
+ * @param locLsnr Local listener.
+ * @param rmtFilterFactory Remote filter factory.
+ * @param oldValRequired Old value required flag.
+ * @param sync Synchronous flag.
+ * @param ignoreExpired Ignore expired events flag.
+ * @param types Event types.
+ */
+ public CacheContinuousQueryHandlerV2(
+ String cacheName,
+ Object topic,
+ CacheEntryUpdatedListener<K, V> locLsnr,
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean ignoreClsNotFound,
+ @Nullable Byte types) {
+ super(cacheName,
+ topic,
+ locLsnr,
+ null,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ ignoreClsNotFound);
+
+ assert rmtFilterFactory != null;
+
+ this.rmtFilterFactory = rmtFilterFactory;
+
+ if (types != null) {
+ assert types != 0;
+
+ this.types = types;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter getEventFilter() {
+ if (filter == null) {
+ assert rmtFilterFactory != null;
+
+ Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
+
+ filter = factory.create();
+
+ if (types != 0)
+ filter = new JCacheQueryRemoteFilter(filter, types);
+ }
+
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+ super.p2pMarshal(ctx);
+
+ if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
+ rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ super.p2pUnmarshal(nodeId, ctx);
+
+ if (rmtFilterFactoryDep != null)
+ rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridContinuousHandler clone() {
+ return super.clone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandlerV2.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ boolean b = rmtFilterFactoryDep != null;
+
+ out.writeBoolean(b);
+
+ if (b)
+ out.writeObject(rmtFilterFactoryDep);
+ else
+ out.writeObject(rmtFilterFactory);
+
+ out.writeByte(types);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ boolean b = in.readBoolean();
+
+ if (b)
+ rmtFilterFactoryDep = (DeployableObject)in.readObject();
+ else
+ rmtFilterFactory = (Factory)in.readObject();
+
+ types = in.readByte();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 409c1da..353043f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,15 +23,16 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
@@ -54,10 +55,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
/**
* Continuous queries manager.
@@ -413,28 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
- CacheEntryEventSerializableFilter rmtFilter,
+ public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+ @Nullable final CacheEntryEventSerializableFilter rmtFilter,
+ @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
boolean loc,
- boolean keepBinary) throws IgniteCheckedException
+ final boolean keepBinary) throws IgniteCheckedException
{
+ IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+
+ if (rmtFilterFactory != null)
+ clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ CacheContinuousQueryHandler hnd;
+
+ if (v2)
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ true,
+ false,
+ true,
+ false,
+ null);
+ else {
+ CacheEntryEventFilter fltr = rmtFilterFactory.create();
+
+ if (!(fltr instanceof CacheEntryEventSerializableFilter))
+ throw new IgniteException("Topology has nodes of the old versions. In this case " +
+ "EntryEventFilter should implement " +
+ "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
+
+ hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ (CacheEntryEventSerializableFilter)fltr,
+ true,
+ false,
+ true,
+ false);
+ }
+
+ return hnd;
+ }
+ };
+ else
+ clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+ return new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ false);
+ }
+ };
+
return executeQuery0(
locLsnr,
- rmtFilter,
+ clsr,
bufSize,
timeInterval,
autoUnsubscribe,
false,
false,
- true,
- false,
- true,
loc,
- keepBinary,
- false);
+ keepBinary);
}
/**
@@ -445,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
- CacheEntryEventSerializableFilter rmtFilter,
- boolean loc,
- boolean notifyExisting,
- boolean ignoreClassNotFound)
+ public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr,
+ final CacheEntryEventSerializableFilter rmtFilter,
+ final boolean loc,
+ final boolean notifyExisting,
+ final boolean ignoreClassNotFound)
throws IgniteCheckedException
{
return executeQuery0(
locLsnr,
- rmtFilter,
+ new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+ return new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ ignoreClassNotFound);
+ }
+ },
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
true,
notifyExisting,
- true,
- false,
- true,
loc,
- false,
- ignoreClassNotFound);
+ false);
}
/**
@@ -539,32 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param locLsnr Local listener.
- * @param rmtFilter Remote filter.
* @param bufSize Buffer size.
* @param timeInterval Time interval.
* @param autoUnsubscribe Auto unsubscribe flag.
* @param internal Internal flag.
* @param notifyExisting Notify existing flag.
- * @param oldValRequired Old value required flag.
- * @param sync Synchronous flag.
- * @param ignoreExpired Ignore expired event flag.
* @param loc Local flag.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
- final CacheEntryEventSerializableFilter rmtFilter,
+ IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
boolean internal,
boolean notifyExisting,
- boolean oldValRequired,
- boolean sync,
- boolean ignoreExpired,
boolean loc,
- final boolean keepBinary,
- boolean ignoreClassNotFound) throws IgniteCheckedException
+ final boolean keepBinary) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -573,21 +628,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
- GridContinuousHandler hnd = new CacheContinuousQueryHandler(
- cctx.name(),
- TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
- locLsnr,
- rmtFilter,
- internal,
- notifyExisting,
- oldValRequired,
- sync,
- ignoreExpired,
- taskNameHash,
- skipPrimaryCheck,
- cctx.isLocal(),
- keepBinary,
- ignoreClassNotFound);
+ boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+ final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+
+ hnd.taskNameHash(taskNameHash);
+ hnd.skipPrimaryCheck(skipPrimaryCheck);
+ hnd.notifyExisting(notifyExisting);
+ hnd.internal(internal);
+ hnd.keepBinary(keepBinary);
+ hnd.localCache(cctx.isLocal());
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -654,7 +704,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.kernalContext().cache().jcache(cctx.name()),
cctx, entry);
- if (rmtFilter != null && !rmtFilter.evaluate(next))
+ if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
next = null;
}
}
@@ -667,6 +717,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param nodes Nodes.
+ * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
+ * otherwise {@code false}.
+ */
+ private boolean useV2Protocol(Collection<ClusterNode> nodes) {
+ for (ClusterNode node : nodes) {
+ if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param lsnrId Listener ID.
* @param lsnr Listener.
* @param internal Internal flag.
@@ -767,36 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (types == 0)
throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces.");
- CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
+ final byte types0 = types;
+
+ final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
locLsnrImpl,
log);
- CacheEntryEventFilter fltr = null;
-
- if (cfg.getCacheEntryEventFilterFactory() != null) {
- fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
- if (!(fltr instanceof Serializable))
- throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
- + fltr);
- }
-
- CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
routineId = executeQuery0(
locLsnr,
- rmtFilter,
+ new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ CacheContinuousQueryHandler hnd;
+ Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+
+ v2 = rmtFilterFactory != null && v2;
+
+ if (v2)
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ cfg.isOldValueRequired(),
+ cfg.isSynchronous(),
+ false,
+ false,
+ types0);
+ else {
+ JCacheQueryRemoteFilter jCacheFilter;
+
+ CacheEntryEventFilter filter = null;
+
+ if (rmtFilterFactory != null) {
+ filter = rmtFilterFactory.create();
+
+ if (!(filter instanceof Serializable))
+ throw new IgniteException("Topology has nodes of the old versions. " +
+ "In this case EntryEventFilter must implement java.io.Serializable " +
+ "interface. Filter: " + filter);
+ }
+
+ jCacheFilter = new JCacheQueryRemoteFilter(filter, types0);
+
+ hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ jCacheFilter,
+ cfg.isOldValueRequired(),
+ cfg.isSynchronous(),
+ false,
+ false);
+ }
+
+ return hnd;
+ }
+ },
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
false,
false,
- cfg.isOldValueRequired(),
- cfg.isSynchronous(),
false,
- false,
- keepBinary,
- false);
+ keepBinary
+ );
}
/**
@@ -814,6 +912,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ *
*/
private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
/** */
@@ -896,8 +995,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * For handler version 2.0 this filter should not be serialized.
*/
- private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
+ protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -922,7 +1022,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param impl Filter.
* @param types Types.
*/
- JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) {
+ JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) {
assert types != 0;
this.impl = impl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1ec69c2..1776748 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -110,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Threads started by this processor. */
private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>();
+ /** */
+ public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9");
+
/** */
private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
@@ -615,7 +619,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
startFuts.put(routineId, fut);
try {
- if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
+ if (locIncluded
+ && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
hnd.onListenerRegistered(routineId, ctx);
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..35fbbd5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
@@ -99,6 +100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/** */
private boolean useObjects;
+ /** */
+ private static AtomicBoolean serialized = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +142,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
assertEquals(0, syncMsgFuts.size());
}
+
+ serialized.set(false);
}
/**
@@ -178,11 +184,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return new CreateUpdateRemoveExpireListener();
}
},
- new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
- return new ExceptionFilter();
- }
- },
+ new ExceptionFilterFactory(),
false,
false
);
@@ -443,18 +445,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
FactoryBuilder.factoryOf(lsnr),
- null,
+ new SerializableFactory(),
true,
false
));
try {
startGrid(gridCount());
+
+ jcache(0).put(1, 1);
}
finally {
stopGrid(gridCount());
}
+ jcache(0).put(2, 2);
+
+ assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
assertFalse(serialized.get());
}
@@ -1130,9 +1137,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ @Override public CacheEntryEventFilter<Object, Object> create() {
return new TestFilter();
}
}
@@ -1184,7 +1191,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
assert evt != null;
@@ -1201,6 +1208,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return key % 2 == 0;
}
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Filter must not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Filter must not be unmarshaled.");
+ }
}
/**
@@ -1355,6 +1372,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ *
+ */
+ public static class SerializableFactory implements Factory<NonSerializableFilter> {
+ /** {@inheritDoc} */
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ return true;
+ }
+ }
+
+ /**
*/
public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
/** */
@@ -1467,4 +1514,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return S.toString(ListenerTestValue.class, this);
}
}
+
+ /**
+ *
+ */
+ static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ return new ExceptionFilter();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
-
- /** {@inheritDoc} */
- @Override public void testEvents(){
- fail("https://issues.apache.org/jira/browse/IGNITE-1600");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index ea2f27b..c6cd5af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -403,18 +404,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
}
}
},
- new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
- /** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
- try {
-
- return cls2.newInstance();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- },
+ new ClassFilterFactory(cls2),
true,
true
);
@@ -946,4 +936,29 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
return true;
}
}
+
+ /**
+ *
+ */
+ private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, Object>> {
+ /** */
+ private Class<CacheEntryEventSerializableFilter> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+ try {
+ return cls.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
new file mode 100644
index 0000000..6143fa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 40;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInternalQuery() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+ UUID uuid = null;
+
+ try {
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object evt : iterable) {
+ latch.countDown();
+
+ log.info("Received event: " + evt);
+ }
+ }
+ };
+
+ uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+ for (int i = 10; i < 20; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, SECONDS));
+ }
+ finally {
+ if (uuid != null)
+ grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .cancelInternalQuery(uuid);
+
+ cache.destroy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+ Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+ if (deploy == CLIENT)
+ evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+ else if (deploy == SERVER)
+ evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+ rnd.nextBoolean()));
+ else {
+ boolean isSync = rnd.nextBoolean();
+
+ for (int i = 0; i < NODES - 1; i++)
+ evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+ }
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ if (i % 10 == 0)
+ log.info("Iteration: " + i);
+
+ for (int idx = 0; idx < NODES; idx++)
+ randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+ }
+ }
+ finally {
+ for (QueryCursor<?> cur : curs)
+ cur.close();
+
+ for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+ grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param nodeIdx Node index.
+ * @param curs Cursors.
+ * @param lsnrCfgs Listener configurations.
+ * @return Event queue
+ */
+ private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+ int nodeIdx,
+ Collection<QueryCursor<?>> curs,
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+ boolean sync) {
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+ @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ }),
+ new FilterFactory(),
+ true,
+ sync
+ );
+
+ grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+ lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+ }
+ else {
+ ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ qry.setRemoteFilterFactory(new FilterFactory());
+
+ QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+ curs.add(cur);
+ }
+
+ return evtsQueue;
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueues Events queue.
+ * @param expData Expected cache data.
+ * @param partCntr Partition counter.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ ConcurrentMap<Object, Object> expData,
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ Transaction tx = null;
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+ tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+ try {
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+
+ break;
+ }
+
+ default:
+ fail("Op:" + op);
+ }
+ } finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionIsolation}.
+ */
+ private TransactionIsolation txRandomIsolation(Random rnd) {
+ int val = rnd.nextInt(3);
+
+ if (val == 0)
+ return READ_COMMITTED;
+ else if (val == 1)
+ return REPEATABLE_READ;
+ else
+ return SERIALIZABLE;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionConcurrency}.
+ */
+ private TransactionConcurrency txRandomConcurrency(Random rnd) {
+ return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal)
+ throws Exception {
+ if ((val == null && oldVal == null
+ || (val != null && !isAccepted((QueryTestValue)val)))) {
+ checkNoEvent(evtsQueues);
+
+ return;
+ }
+
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class NonSerializableFilter
+ implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+ CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+ /** */
+ public NonSerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(QueryTestValue val) {
+ return val == null || val.val1 % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{
+ /** */
+ public SerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(Integer val) {
+ return val == null || val % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class FilterFactory implements Factory<NonSerializableFilter> {
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public abstract class LocalNonSerialiseListener implements
+ CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+ CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+ Externalizable {
+ /** */
+ public LocalNonSerialiseListener() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /**
+ * @param evts Events.
+ */
+ protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts);
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
if (hnd.isQuery() && hnd.cacheName() == null) {
- backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+ backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..97f9e0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+ cfg.setPeerClassLoadingEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param isClient Client.
+ * @throws Exception If failed.
+ */
+ protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ QueryCursor<?> cur = null;
+
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ TestLocalListener localLsnr = new TestLocalListener() {
+ @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ latch.countDown();
+
+ log.info("Received event: " + evt);
+ }
+ }
+ };
+
+ MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ new FactoryBuilder.SingletonFactory<>(localLsnr),
+ (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+ (Object)evtFilterFactory.newInstance(),
+ true,
+ true
+ );
+
+ qry.setLocalListener(localLsnr);
+
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+ IgniteCache<Integer, Integer> cache = null;
+
+ try {
+ if (isClient)
+ cache = grid(NODES - 1).cache(ccfg.getName());
+ else
+ cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+ cur = cache.query(qry);
+
+ cache.registerCacheEntryListener(lsnrCfg);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ }
+ finally {
+ if (cur != null)
+ cur.close();
+
+ if (cache != null)
+ cache.deregisterCacheEntryListener(lsnrCfg);
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>,
+ CacheEntryCreatedListener<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ onEvent(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ onEvent(evts);
+ }
+
+ /**
+ * @param evts Events.
+ */
+ protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts);
+ }
+}
[06/10] ignite git commit: IGNITE-2754: IGFS: Created separate
processor for listing remove operation.
Posted by sb...@apache.org.
IGNITE-2754: IGFS: Created separate processor for listing remove operation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c13339fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c13339fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c13339fe
Branch: refs/heads/ignite-1232
Commit: c13339fef50b34157d0b9d74ee42c24d89af79b2
Parents: bcb8b52
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 13:25:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 13:25:43 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 160 ++++++++++++-------
1 file changed, 105 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c13339fe/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 0ba78c5..c120b9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -43,6 +43,7 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
@@ -847,7 +848,7 @@ public class IgfsMetaManager extends IgfsManager {
if (!id2InfoPrj.putIfAbsent(fileId, newFileInfo))
throw fsException("Failed to add file details into cache: " + newFileInfo);
- id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new IgfsListingEntry(newFileInfo), false));
+ id2InfoPrj.invoke(parentId, new ListingAdd(fileName, new IgfsListingEntry(newFileInfo)));
return null;
}
@@ -956,8 +957,8 @@ public class IgfsMetaManager extends IgfsManager {
// 8. Actual move: remove from source parent and add to destination target.
IgfsListingEntry entry = srcTargetInfo.listing().get(srcName);
- id2InfoPrj.invoke(srcTargetId, new UpdateListing(srcName, entry, true));
- id2InfoPrj.invoke(dstTargetId, new UpdateListing(dstName, entry, false));
+ id2InfoPrj.invoke(srcTargetId, new ListingRemove(srcName, entry.fileId()));
+ id2InfoPrj.invoke(dstTargetId, new ListingAdd(dstName, entry));
tx.commit();
@@ -1093,10 +1094,10 @@ public class IgfsMetaManager extends IgfsManager {
", destParentId=" + destParentId + ", destEntry=" + destEntry + ']'));
// Remove listing entry from the source parent listing.
- id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+ id2InfoPrj.invoke(srcParentId, new ListingRemove(srcFileName, srcEntry.fileId()));
// Add listing entry into the destination parent listing.
- id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false));
+ id2InfoPrj.invoke(destParentId, new ListingAdd(destFileName, srcEntry));
}
/**
@@ -1134,8 +1135,8 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.put(newInfo.id(), newInfo);
// Add new info to trash listing.
- id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
- new IgfsListingEntry(newInfo), false));
+ id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(),
+ new IgfsListingEntry(newInfo)));
// Remove listing entries from root.
// Note that root directory properties and other attributes are preserved:
@@ -1233,10 +1234,10 @@ public class IgfsMetaManager extends IgfsManager {
assert victimId.equals(srcEntry.fileId());
- id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+ id2InfoPrj.invoke(srcParentId, new ListingRemove(srcFileName, srcEntry.fileId()));
// Add listing entry into the destination parent listing.
- id2InfoPrj.invoke(TRASH_ID, new UpdateListing(destFileName, srcEntry, false));
+ id2InfoPrj.invoke(TRASH_ID, new ListingAdd(destFileName, srcEntry));
if (victimInfo.isFile())
// Update a file info of the removed file with a file path,
@@ -1313,12 +1314,12 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.getAndPut(newInfo.id(), newInfo);
// Add new info to trash listing.
- id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
- new IgfsListingEntry(newInfo), false));
+ id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(),
+ new IgfsListingEntry(newInfo)));
// Remove listing entries from root.
for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
- id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
+ id2InfoPrj.invoke(ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId()));
resId = newInfo.id();
}
@@ -1468,7 +1469,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsListingEntry listingEntry = parentInfo.listing().get(name);
if (listingEntry != null)
- id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
+ id2InfoPrj.invoke(parentId, new ListingRemove(name, listingEntry.fileId()));
IgfsFileInfo deleted = id2InfoPrj.getAndRemove(id);
@@ -1599,7 +1600,7 @@ public class IgfsMetaManager extends IgfsManager {
assert id2InfoPrj.get(parentId) != null;
- id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false));
+ id2InfoPrj.invoke(parentId, new ListingAdd(fileName, entry));
}
return newInfo;
@@ -2009,9 +2010,9 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.put(newInfo.id(), newInfo); // Put the new one.
id2InfoPrj.invoke(parentInfo.id(),
- new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true));
+ new ListingRemove(path.name(), parentInfo.listing().get(path.name()).fileId()));
id2InfoPrj.invoke(parentInfo.id(),
- new UpdateListing(path.name(), new IgfsListingEntry(newInfo), false));
+ new ListingAdd(path.name(), new IgfsListingEntry(newInfo)));
IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo);
}
@@ -3213,10 +3214,82 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Remove entry from directory listing.
+ */
+ @GridInternal
+ private static final class ListingRemove implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** File name. */
+ private String fileName;
+
+ /** Expected ID. */
+ private IgniteUuid fileId;
+
+ /**
+ * Default constructor.
+ */
+ public ListingRemove() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param fileName File name.
+ * @param fileId File ID.
+ */
+ public ListingRemove(String fileName, IgniteUuid fileId) {
+ this.fileName = fileName;
+ this.fileId = fileId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo fileInfo = e.getValue();
+
+ assert fileInfo != null;
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ listing.putAll(fileInfo.listing());
+
+ IgfsListingEntry oldEntry = listing.get(fileName);
+
+ if (oldEntry == null || !oldEntry.fileId().equals(fileId))
+ throw new IgniteException("Directory listing doesn't contain expected file" +
+ " [listing=" + listing + ", fileName=" + fileName + "]");
+
+ // Modify listing in-place.
+ listing.remove(fileName);
+
+ e.setValue(new IgfsFileInfo(listing, fileInfo));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, fileName);
+ U.writeGridUuid(out, fileId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fileName = U.readString(in);
+ fileId = U.readGridUuid(in);
+ }
+ }
+
+ /**
* Update directory listing closure.
*/
@GridInternal
- private static final class UpdateListing implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ private static final class ListingAdd implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -3227,30 +3300,25 @@ public class IgfsMetaManager extends IgfsManager {
/** File ID.*/
private IgfsListingEntry entry;
- /** Update operation: remove entry from listing if {@code true} or add entry to listing if {@code false}. */
- private boolean rmv;
-
/**
* Constructs update directory listing closure.
*
* @param fileName File name to add into parent listing.
* @param entry Listing entry to add or remove.
- * @param rmv Remove entry from listing if {@code true} or add entry to listing if {@code false}.
*/
- private UpdateListing(String fileName, IgfsListingEntry entry, boolean rmv) {
+ private ListingAdd(String fileName, IgfsListingEntry entry) {
assert fileName != null;
assert entry != null;
this.fileName = fileName;
this.entry = entry;
- this.rmv = rmv;
}
/**
* Empty constructor required for {@link Externalizable}.
*
*/
- public UpdateListing() {
+ public ListingAdd() {
// No-op.
}
@@ -3261,30 +3329,15 @@ public class IgfsMetaManager extends IgfsManager {
assert fileInfo != null : "File info not found for the child: " + entry.fileId();
assert fileInfo.isDirectory();
- Map<String, IgfsListingEntry> listing =
- U.newHashMap(fileInfo.listing().size() + (rmv ? 0 : 1));
-
- listing.putAll(fileInfo.listing());
-
- if (rmv) {
- IgfsListingEntry oldEntry = listing.get(fileName);
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
- if (oldEntry == null || !oldEntry.fileId().equals(entry.fileId()))
- throw new IgniteException("Directory listing doesn't contain expected file" +
- " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry + ']');
+ // Modify listing in-place.
+ IgfsListingEntry oldEntry = listing.put(fileName, entry);
- // Modify listing in-place.
- listing.remove(fileName);
- }
- else {
- // Modify listing in-place.
- IgfsListingEntry oldEntry = listing.put(fileName, entry);
-
- if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
- throw new IgniteException("Directory listing contains unexpected file" +
- " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
- ", oldEntry=" + oldEntry + ']');
- }
+ if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
+ throw new IgniteException("Directory listing contains unexpected file" +
+ " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
+ ", oldEntry=" + oldEntry + ']');
e.setValue(new IgfsFileInfo(listing, fileInfo));
@@ -3295,19 +3348,17 @@ public class IgfsMetaManager extends IgfsManager {
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, fileName);
out.writeObject(entry);
- out.writeBoolean(rmv);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
fileName = U.readString(in);
entry = (IgfsListingEntry)in.readObject();
- rmv = in.readBoolean();
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(UpdateListing.class, this);
+ return S.toString(ListingAdd.class, this);
}
}
@@ -3482,11 +3533,11 @@ public class IgfsMetaManager extends IgfsManager {
assert deletedEntry != null;
- id2InfoPrj.invoke(parentId, new UpdateListing(name, deletedEntry, true));
+ id2InfoPrj.invoke(parentId, new ListingRemove(name, deletedEntry.fileId()));
// Add listing entry into the destination parent listing.
- id2InfoPrj.invoke(TRASH_ID, new UpdateListing(
- lowermostExistingInfo.id().toString(), deletedEntry, false));
+ id2InfoPrj.invoke(TRASH_ID, new ListingAdd(
+ lowermostExistingInfo.id().toString(), deletedEntry));
// Update a file info of the removed file with a file path,
// which will be used by delete worker for event notifications.
@@ -3505,7 +3556,7 @@ public class IgfsMetaManager extends IgfsManager {
assert put;
id2InfoPrj.invoke(parentId,
- new UpdateListing(name, new IgfsListingEntry(newFileInfo), false));
+ new ListingAdd(name, new IgfsListingEntry(newFileInfo)));
IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId);
@@ -3679,8 +3730,7 @@ public class IgfsMetaManager extends IgfsManager {
throws IgniteCheckedException {
assert childInfo != null;
- id2InfoPrj.invoke(lowermostExistingId,
- new UpdateListing(childName, new IgfsListingEntry(childInfo), false));
+ id2InfoPrj.invoke(lowermostExistingId, new ListingAdd(childName, new IgfsListingEntry(childInfo)));
}
/**
[02/10] ignite git commit: IGNITE-1419 .Net: Added optional "raw"
flag to binary type configuration. This closes #376.
Posted by sb...@apache.org.
IGNITE-1419 .Net: Added optional "raw" flag to binary type configuration. This closes #376.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cba4f4c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cba4f4c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cba4f4c0
Branch: refs/heads/ignite-1232
Commit: cba4f4c039013a79e50078a92cc2df761156c66f
Parents: 2d38eb8
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Mar 3 09:58:40 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 09:58:40 2016 +0300
----------------------------------------------------------------------
.../utils/PlatformConfigurationUtils.java | 6 +-
.../Binary/BinarySelfTest.cs | 110 ++--
.../Apache.Ignite.Core.csproj | 2 +-
.../Binary/BinaryReflectiveSerializer.cs | 241 ++++++++
.../Multicast/TcpDiscoveryMulticastIpFinder.cs | 2 +-
.../Impl/Binary/BinaryReflectiveActions.cs | 576 +++++++++++++------
.../Impl/Binary/BinaryReflectiveSerializer.cs | 218 -------
.../Impl/Binary/BinaryWriter.cs | 28 +-
8 files changed, 749 insertions(+), 434 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 32ab812..c0e9f1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -594,10 +594,10 @@ import java.util.Map;
w.writeInt(multiFinder.getAddressRequestAttempts());
w.writeInt(multiFinder.getResponseWaitTime());
- Integer ttl = multiFinder.getTimeToLive();
- w.writeBoolean(ttl != null);
+ int ttl = multiFinder.getTimeToLive();
+ w.writeBoolean(ttl != -1);
- if (ttl != null)
+ if (ttl != -1)
w.writeInt(ttl);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
index f49a28a..24ce3c8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
@@ -727,16 +727,22 @@ namespace Apache.Ignite.Core.Tests.Binary
* <summary>Check write of primitive fields through reflection.</summary>
*/
[Test]
- public void TestPrimitiveFieldsReflective()
+ public void TestPrimitiveFieldsReflective([Values(false, true)] bool raw)
{
- ICollection<BinaryTypeConfiguration> typeCfgs =
- new List<BinaryTypeConfiguration>();
-
- typeCfgs.Add(new BinaryTypeConfiguration(typeof(PrimitiveFieldType)));
+ var serializer = new BinaryReflectiveSerializer {RawMode = raw};
- BinaryConfiguration cfg = new BinaryConfiguration {TypeConfigurations = typeCfgs};
+ Assert.AreEqual(raw, serializer.RawMode);
- Marshaller marsh = new Marshaller(cfg);
+ var marsh = new Marshaller(new BinaryConfiguration
+ {
+ TypeConfigurations = new[]
+ {
+ new BinaryTypeConfiguration(typeof (PrimitiveFieldType))
+ {
+ Serializer = serializer
+ }
+ }
+ });
PrimitiveFieldType obj = new PrimitiveFieldType();
@@ -927,15 +933,21 @@ namespace Apache.Ignite.Core.Tests.Binary
* <summary>Check write of object with enums.</summary>
*/
[Test]
- public void TestEnumsReflective()
+ public void TestEnumsReflective([Values(false, true)] bool raw)
{
Marshaller marsh =
new Marshaller(new BinaryConfiguration
{
TypeConfigurations = new[]
{
- new BinaryTypeConfiguration(typeof (EnumType)),
+ new BinaryTypeConfiguration(typeof (EnumType))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ },
new BinaryTypeConfiguration(typeof (TestEnum))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ }
}
});
@@ -951,18 +963,26 @@ namespace Apache.Ignite.Core.Tests.Binary
Assert.AreEqual(obj.GetHashCode(), portObj.GetHashCode());
- // Test enum field in binary form
- var binEnum = portObj.GetField<IBinaryObject>("PEnum");
- Assert.AreEqual(obj.PEnum.GetHashCode(), binEnum.GetHashCode());
- Assert.AreEqual((int) obj.PEnum, binEnum.EnumValue);
- Assert.AreEqual(obj.PEnum, binEnum.Deserialize<TestEnum>());
- Assert.AreEqual(obj.PEnum, binEnum.Deserialize<object>());
- Assert.AreEqual(typeof(TestEnum), binEnum.Deserialize<object>().GetType());
- Assert.AreEqual(null, binEnum.GetField<object>("someField"));
- Assert.IsFalse(binEnum.HasField("anyField"));
+ if (!raw)
+ {
+ // Test enum field in binary form
+ var binEnum = portObj.GetField<IBinaryObject>("PEnum");
+ Assert.AreEqual(obj.PEnum.GetHashCode(), binEnum.GetHashCode());
+ Assert.AreEqual((int) obj.PEnum, binEnum.EnumValue);
+ Assert.AreEqual(obj.PEnum, binEnum.Deserialize<TestEnum>());
+ Assert.AreEqual(obj.PEnum, binEnum.Deserialize<object>());
+ Assert.AreEqual(typeof (TestEnum), binEnum.Deserialize<object>().GetType());
+ Assert.AreEqual(null, binEnum.GetField<object>("someField"));
+ Assert.IsFalse(binEnum.HasField("anyField"));
- var binEnumArr = portObj.GetField<IBinaryObject[]>("PEnumArray");
- Assert.IsTrue(binEnumArr.Select(x => x.Deserialize<TestEnum>()).SequenceEqual(obj.PEnumArray));
+ var binEnumArr = portObj.GetField<IBinaryObject[]>("PEnumArray");
+ Assert.IsTrue(binEnumArr.Select(x => x.Deserialize<TestEnum>()).SequenceEqual(obj.PEnumArray));
+ }
+ else
+ {
+ Assert.IsFalse(portObj.HasField("PEnum"));
+ Assert.IsFalse(portObj.HasField("PEnumArray"));
+ }
EnumType newObj = portObj.Deserialize<EnumType>();
@@ -974,14 +994,20 @@ namespace Apache.Ignite.Core.Tests.Binary
* <summary>Check write of object with collections.</summary>
*/
[Test]
- public void TestCollectionsReflective()
+ public void TestCollectionsReflective([Values(false, true)] bool raw)
{
var marsh = new Marshaller(new BinaryConfiguration
{
- TypeConfigurations = new List<BinaryTypeConfiguration>
+ TypeConfigurations = new[]
{
- new BinaryTypeConfiguration(typeof (CollectionsType)),
+ new BinaryTypeConfiguration(typeof (CollectionsType))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ },
new BinaryTypeConfiguration(typeof (InnerObjectType))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ }
}
});
@@ -1043,29 +1069,39 @@ namespace Apache.Ignite.Core.Tests.Binary
* <summary>Check write of object fields through reflective serializer.</summary>
*/
[Test]
- public void TestObjectReflective()
+ public void TestObjectReflective([Values(false, true)] bool raw)
{
- ICollection<BinaryTypeConfiguration> typeCfgs =
- new List<BinaryTypeConfiguration>();
-
- typeCfgs.Add(new BinaryTypeConfiguration(typeof(OuterObjectType)));
- typeCfgs.Add(new BinaryTypeConfiguration(typeof(InnerObjectType)));
-
- BinaryConfiguration cfg = new BinaryConfiguration();
-
- cfg.TypeConfigurations = typeCfgs;
-
- Marshaller marsh = new Marshaller(cfg);
+ var marsh = new Marshaller(new BinaryConfiguration
+ {
+ TypeConfigurations = new[]
+ {
+ new BinaryTypeConfiguration(typeof (OuterObjectType))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ },
+ new BinaryTypeConfiguration(typeof (InnerObjectType))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ }
+ }
+ });
CheckObject(marsh, new OuterObjectType(), new InnerObjectType());
}
[Test]
- public void TestStructsReflective()
+ public void TestStructsReflective([Values(false, true)] bool raw)
{
var marsh = new Marshaller(new BinaryConfiguration
{
- TypeConfigurations = new[] {new BinaryTypeConfiguration(typeof (ReflectiveStruct))}
+ TypeConfigurations =
+ new[]
+ {
+ new BinaryTypeConfiguration(typeof (ReflectiveStruct))
+ {
+ Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+ }
+ }
});
var obj = new ReflectiveStruct(15, 28.8);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 1b66f0c..6acc7c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -65,6 +65,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="Binary\BinaryReflectiveSerializer.cs" />
<Compile Include="Binary\Package-Info.cs" />
<Compile Include="Cache\CacheAtomicUpdateTimeoutException.cs" />
<Compile Include="Cache\CacheEntryProcessorException.cs" />
@@ -328,7 +329,6 @@
<Compile Include="Impl\Binary\BinaryReaderHandleDictionary.cs" />
<Compile Include="Impl\Binary\BinaryReader.cs" />
<Compile Include="Impl\Binary\BinaryReflectiveActions.cs" />
- <Compile Include="Impl\Binary\BinaryReflectiveSerializer.cs" />
<Compile Include="Impl\Binary\Binary.cs" />
<Compile Include="Impl\Binary\Structure\BinaryStructureTracker.cs" />
<Compile Include="Impl\Binary\BinarySurrogateTypeDescriptor.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
new file mode 100644
index 0000000..02703be
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Binary
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using Apache.Ignite.Core.Impl.Binary;
+
+ /// <summary>
+ /// Binary serializer which reflectively writes all fields except of ones with
+ /// <see cref="System.NonSerializedAttribute"/>.
+ /// <para />
+ /// Note that Java platform stores dates as a difference between current time
+ /// and predefined absolute UTC date. Therefore, this difference is always the
+ /// same for all time zones. .NET, in contrast, stores dates as a difference
+ /// between current time and some predefined date relative to the current time
+ /// zone. It means that this difference will be different as you change time zones.
+ /// To overcome this discrepancy Ignite always converts .Net date to UTC form
+ /// before serializing and allows user to decide whether to deserialize them
+ /// in UTC or local form using <c>ReadTimestamp(..., true/false)</c> methods in
+ /// <see cref="IBinaryReader"/> and <see cref="IBinaryRawReader"/>.
+ /// This serializer always read dates in UTC form. It means that if you have
+ /// local date in any field/property, it will be implicitly converted to UTC
+ /// form after the first serialization-deserialization cycle.
+ /// </summary>
+ public sealed class BinaryReflectiveSerializer : IBinarySerializer
+ {
+ /** Cached binding flags. */
+ private const BindingFlags Flags = BindingFlags.Instance | BindingFlags.Public |
+ BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
+
+ /** Cached type descriptors. */
+ private readonly IDictionary<Type, Descriptor> _types = new Dictionary<Type, Descriptor>();
+
+ /** Raw mode flag. */
+ private bool _rawMode;
+
+ /// <summary>
+ /// Gets or value indicating whether raw mode serialization should be used.
+ /// <para />
+ /// Raw mode does not include field names, improving performance and memory usage.
+ /// However, queries do not support raw objects.
+ /// </summary>
+ public bool RawMode
+ {
+ get { return _rawMode; }
+ set
+ {
+ if (_types.Count > 0)
+ throw new InvalidOperationException(typeof (BinarizableSerializer).Name +
+ ".RawMode cannot be changed after first serialization.");
+
+ _rawMode = value;
+ }
+ }
+
+ /// <summary>
+ /// Write portalbe object.
+ /// </summary>
+ /// <param name="obj">Object.</param>
+ /// <param name="writer">Writer.</param>
+ /// <exception cref="BinaryObjectException">Type is not registered in serializer: + type.Name</exception>
+ public void WriteBinary(object obj, IBinaryWriter writer)
+ {
+ var binarizable = obj as IBinarizable;
+
+ if (binarizable != null)
+ binarizable.WriteBinary(writer);
+ else
+ GetDescriptor(obj).Write(obj, writer);
+ }
+
+ /// <summary>
+ /// Read binary object.
+ /// </summary>
+ /// <param name="obj">Instantiated empty object.</param>
+ /// <param name="reader">Reader.</param>
+ /// <exception cref="BinaryObjectException">Type is not registered in serializer: + type.Name</exception>
+ public void ReadBinary(object obj, IBinaryReader reader)
+ {
+ var binarizable = obj as IBinarizable;
+
+ if (binarizable != null)
+ binarizable.ReadBinary(reader);
+ else
+ GetDescriptor(obj).Read(obj, reader);
+ }
+
+ /// <summary>Register type.</summary>
+ /// <param name="type">Type.</param>
+ /// <param name="typeId">Type ID.</param>
+ /// <param name="converter">Name converter.</param>
+ /// <param name="idMapper">ID mapper.</param>
+ internal void Register(Type type, int typeId, IBinaryNameMapper converter,
+ IBinaryIdMapper idMapper)
+ {
+ if (type.GetInterface(typeof(IBinarizable).Name) != null)
+ return;
+
+ List<FieldInfo> fields = new List<FieldInfo>();
+
+ Type curType = type;
+
+ while (curType != null)
+ {
+ foreach (FieldInfo field in curType.GetFields(Flags))
+ {
+ if (!field.IsNotSerialized)
+ fields.Add(field);
+ }
+
+ curType = curType.BaseType;
+ }
+
+ IDictionary<int, string> idMap = new Dictionary<int, string>();
+
+ foreach (FieldInfo field in fields)
+ {
+ string fieldName = BinaryUtils.CleanFieldName(field.Name);
+
+ int fieldId = BinaryUtils.FieldId(typeId, fieldName, converter, idMapper);
+
+ if (idMap.ContainsKey(fieldId))
+ {
+ throw new BinaryObjectException("Conflicting field IDs [type=" +
+ type.Name + ", field1=" + idMap[fieldId] + ", field2=" + fieldName +
+ ", fieldId=" + fieldId + ']');
+ }
+
+ idMap[fieldId] = fieldName;
+ }
+
+ fields.Sort(Compare);
+
+ Descriptor desc = new Descriptor(fields, _rawMode);
+
+ _types[type] = desc;
+ }
+
+ /// <summary>
+ /// Gets the descriptor for an object.
+ /// </summary>
+ private Descriptor GetDescriptor(object obj)
+ {
+ var type = obj.GetType();
+
+ Descriptor desc;
+
+ if (!_types.TryGetValue(type, out desc))
+ throw new BinaryObjectException("Type is not registered in serializer: " + type.Name);
+
+ return desc;
+ }
+
+ /// <summary>
+ /// Compare two FieldInfo instances.
+ /// </summary>
+ private static int Compare(FieldInfo info1, FieldInfo info2) {
+ string name1 = BinaryUtils.CleanFieldName(info1.Name);
+ string name2 = BinaryUtils.CleanFieldName(info2.Name);
+
+ return string.Compare(name1, name2, StringComparison.OrdinalIgnoreCase);
+ }
+
+ /// <summary>
+ /// Type descriptor.
+ /// </summary>
+ private class Descriptor
+ {
+ /** Write actions to be performed. */
+ private readonly List<BinaryReflectiveWriteAction> _wActions;
+
+ /** Read actions to be performed. */
+ private readonly List<BinaryReflectiveReadAction> _rActions;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="fields">Fields.</param>
+ /// <param name="raw">Raw mode.</param>
+ public Descriptor(List<FieldInfo> fields, bool raw)
+ {
+ _wActions = new List<BinaryReflectiveWriteAction>(fields.Count);
+ _rActions = new List<BinaryReflectiveReadAction>(fields.Count);
+
+ foreach (FieldInfo field in fields)
+ {
+ BinaryReflectiveWriteAction writeAction;
+ BinaryReflectiveReadAction readAction;
+
+ BinaryReflectiveActions.GetTypeActions(field, out writeAction, out readAction, raw);
+
+ _wActions.Add(writeAction);
+ _rActions.Add(readAction);
+ }
+ }
+
+ /// <summary>
+ /// Write object.
+ /// </summary>
+ /// <param name="obj">Object.</param>
+ /// <param name="writer">Writer.</param>
+ public void Write(object obj, IBinaryWriter writer)
+ {
+ int cnt = _wActions.Count;
+
+ for (int i = 0; i < cnt; i++)
+ _wActions[i](obj, writer);
+ }
+
+ /// <summary>
+ /// Read object.
+ /// </summary>
+ /// <param name="obj">Object.</param>
+ /// <param name="reader">Reader.</param>
+ public void Read(object obj, IBinaryReader reader)
+ {
+ int cnt = _rActions.Count;
+
+ for (int i = 0; i < cnt; i++ )
+ _rActions[i](obj, reader);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
index 5df5ea1..4581a04 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
@@ -91,7 +91,7 @@ namespace Apache.Ignite.Core.Discovery.Tcp.Multicast
/// Gets or sets the time to live for multicast packets sent out on this
/// IP finder in order to control the scope of the multicast.
/// </summary>
- public byte? TimeToLive { get; set; } // TODO: Nullable?
+ public byte? TimeToLive { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="TcpDiscoveryMulticastIpFinder"/> class.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
index 15509fc..8b5e2a1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
@@ -49,29 +49,53 @@ namespace Apache.Ignite.Core.Impl.Binary
private static readonly MethodInfo MthdReadEnum =
typeof(IBinaryReader).GetMethod("ReadEnum", new[] { typeof(string) });
+ /** Method: read enum. */
+ private static readonly MethodInfo MthdReadEnumRaw = typeof (IBinaryRawReader).GetMethod("ReadEnum");
+
/** Method: read enum array. */
private static readonly MethodInfo MthdReadEnumArray =
typeof(IBinaryReader).GetMethod("ReadEnumArray", new[] { typeof(string) });
+ /** Method: read enum array. */
+ private static readonly MethodInfo MthdReadEnumArrayRaw = typeof(IBinaryRawReader).GetMethod("ReadEnumArray");
+
/** Method: read array. */
private static readonly MethodInfo MthdReadObjArray =
typeof(IBinaryReader).GetMethod("ReadArray", new[] { typeof(string) });
+ /** Method: read array. */
+ private static readonly MethodInfo MthdReadObjArrayRaw = typeof(IBinaryRawReader).GetMethod("ReadArray");
+
/** Method: read object. */
private static readonly MethodInfo MthdReadObj=
typeof(IBinaryReader).GetMethod("ReadObject", new[] { typeof(string) });
+ /** Method: read object. */
+ private static readonly MethodInfo MthdReadObjRaw = typeof(IBinaryRawReader).GetMethod("ReadObject");
+
/** Method: write enum array. */
- private static readonly MethodInfo MthdWriteEnumArray =
- typeof(IBinaryWriter).GetMethod("WriteEnumArray");
+ private static readonly MethodInfo MthdWriteEnumArray = typeof(IBinaryWriter).GetMethod("WriteEnumArray");
+
+ /** Method: write enum array. */
+ private static readonly MethodInfo MthdWriteEnumArrayRaw = typeof(IBinaryRawWriter).GetMethod("WriteEnumArray");
/** Method: write array. */
- private static readonly MethodInfo MthdWriteObjArray =
- typeof(IBinaryWriter).GetMethod("WriteArray");
+ private static readonly MethodInfo MthdWriteObjArray = typeof(IBinaryWriter).GetMethod("WriteArray");
- /** Method: read object. */
- private static readonly MethodInfo MthdWriteObj =
- typeof(IBinaryWriter).GetMethod("WriteObject");
+ /** Method: write array. */
+ private static readonly MethodInfo MthdWriteObjArrayRaw = typeof(IBinaryRawWriter).GetMethod("WriteArray");
+
+ /** Method: write object. */
+ private static readonly MethodInfo MthdWriteObj = typeof(IBinaryWriter).GetMethod("WriteObject");
+
+ /** Method: write object. */
+ private static readonly MethodInfo MthdWriteObjRaw = typeof(IBinaryRawWriter).GetMethod("WriteObject");
+
+ /** Method: raw writer */
+ private static readonly MethodInfo MthdGetRawWriter = typeof(IBinaryWriter).GetMethod("GetRawWriter");
+
+ /** Method: raw writer */
+ private static readonly MethodInfo MthdGetRawReader = typeof(IBinaryReader).GetMethod("GetRawReader");
/// <summary>
/// Lookup read/write actions for the given type.
@@ -79,17 +103,18 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="field">The field.</param>
/// <param name="writeAction">Write action.</param>
/// <param name="readAction">Read action.</param>
- public static void TypeActions(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
- out BinaryReflectiveReadAction readAction)
+ /// <param name="raw">Raw mode.</param>
+ public static void GetTypeActions(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
+ out BinaryReflectiveReadAction readAction, bool raw)
{
var type = field.FieldType;
if (type.IsPrimitive)
- HandlePrimitive(field, out writeAction, out readAction);
+ HandlePrimitive(field, out writeAction, out readAction, raw);
else if (type.IsArray)
- HandleArray(field, out writeAction, out readAction);
+ HandleArray(field, out writeAction, out readAction, raw);
else
- HandleOther(field, out writeAction, out readAction);
+ HandleOther(field, out writeAction, out readAction, raw);
}
/// <summary>
@@ -98,71 +123,108 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="field">The field.</param>
/// <param name="writeAction">Write action.</param>
/// <param name="readAction">Read action.</param>
+ /// <param name="raw">Raw mode.</param>
/// <exception cref="IgniteException">Unsupported primitive type: + type.Name</exception>
private static void HandlePrimitive(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
- out BinaryReflectiveReadAction readAction)
+ out BinaryReflectiveReadAction readAction, bool raw)
{
var type = field.FieldType;
- if (type == typeof(bool))
+ if (type == typeof (bool))
{
- writeAction = GetWriter<bool>(field, (f, w, o) => w.WriteBoolean(f, o));
- readAction = GetReader(field, (f, r) => r.ReadBoolean(f));
+ writeAction = raw
+ ? GetRawWriter<bool>(field, (w, o) => w.WriteBoolean(o))
+ : GetWriter<bool>(field, (f, w, o) => w.WriteBoolean(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadBoolean())
+ : GetReader(field, (f, r) => r.ReadBoolean(f));
}
- else if (type == typeof(sbyte))
+ else if (type == typeof (sbyte))
{
- writeAction = GetWriter<sbyte>(field, (f, w, o) => w.WriteByte(f, unchecked((byte) o)));
- readAction = GetReader(field, (f, r) => unchecked ((sbyte)r.ReadByte(f)));
+ writeAction = raw
+ ? GetRawWriter<sbyte>(field, (w, o) => w.WriteByte(unchecked((byte) o)))
+ : GetWriter<sbyte>(field, (f, w, o) => w.WriteByte(f, unchecked((byte) o)));
+ readAction = raw
+ ? GetRawReader(field, r => unchecked ((sbyte) r.ReadByte()))
+ : GetReader(field, (f, r) => unchecked ((sbyte) r.ReadByte(f)));
}
- else if (type == typeof(byte))
+ else if (type == typeof (byte))
{
- writeAction = GetWriter<byte>(field, (f, w, o) => w.WriteByte(f, o));
- readAction = GetReader(field, (f, r) => r.ReadByte(f));
+ writeAction = raw
+ ? GetRawWriter<byte>(field, (w, o) => w.WriteByte(o))
+ : GetWriter<byte>(field, (f, w, o) => w.WriteByte(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadByte()) : GetReader(field, (f, r) => r.ReadByte(f));
}
- else if (type == typeof(short))
+ else if (type == typeof (short))
{
- writeAction = GetWriter<short>(field, (f, w, o) => w.WriteShort(f, o));
- readAction = GetReader(field, (f, r) => r.ReadShort(f));
+ writeAction = raw
+ ? GetRawWriter<short>(field, (w, o) => w.WriteShort(o))
+ : GetWriter<short>(field, (f, w, o) => w.WriteShort(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadShort()) : GetReader(field, (f, r) => r.ReadShort(f));
}
- else if (type == typeof(ushort))
+ else if (type == typeof (ushort))
{
- writeAction = GetWriter<ushort>(field, (f, w, o) => w.WriteShort(f, unchecked((short) o)));
- readAction = GetReader(field, (f, r) => unchecked((ushort) r.ReadShort(f)));
+ writeAction = raw
+ ? GetRawWriter<ushort>(field, (w, o) => w.WriteShort(unchecked((short) o)))
+ : GetWriter<ushort>(field, (f, w, o) => w.WriteShort(f, unchecked((short) o)));
+ readAction = raw
+ ? GetRawReader(field, r => unchecked((ushort) r.ReadShort()))
+ : GetReader(field, (f, r) => unchecked((ushort) r.ReadShort(f)));
}
- else if (type == typeof(char))
+ else if (type == typeof (char))
{
- writeAction = GetWriter<char>(field, (f, w, o) => w.WriteChar(f, o));
- readAction = GetReader(field, (f, r) => r.ReadChar(f));
+ writeAction = raw
+ ? GetRawWriter<char>(field, (w, o) => w.WriteChar(o))
+ : GetWriter<char>(field, (f, w, o) => w.WriteChar(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadChar()) : GetReader(field, (f, r) => r.ReadChar(f));
}
- else if (type == typeof(int))
+ else if (type == typeof (int))
{
- writeAction = GetWriter<int>(field, (f, w, o) => w.WriteInt(f, o));
- readAction = GetReader(field, (f, r) => r.ReadInt(f));
+ writeAction = raw
+ ? GetRawWriter<int>(field, (w, o) => w.WriteInt(o))
+ : GetWriter<int>(field, (f, w, o) => w.WriteInt(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadInt()) : GetReader(field, (f, r) => r.ReadInt(f));
}
- else if (type == typeof(uint))
+ else if (type == typeof (uint))
{
- writeAction = GetWriter<uint>(field, (f, w, o) => w.WriteInt(f, unchecked((int) o)));
- readAction = GetReader(field, (f, r) => unchecked((uint) r.ReadInt(f)));
+ writeAction = raw
+ ? GetRawWriter<uint>(field, (w, o) => w.WriteInt(unchecked((int) o)))
+ : GetWriter<uint>(field, (f, w, o) => w.WriteInt(f, unchecked((int) o)));
+ readAction = raw
+ ? GetRawReader(field, r => unchecked((uint) r.ReadInt()))
+ : GetReader(field, (f, r) => unchecked((uint) r.ReadInt(f)));
}
- else if (type == typeof(long))
+ else if (type == typeof (long))
{
- writeAction = GetWriter<long>(field, (f, w, o) => w.WriteLong(f, o));
- readAction = GetReader(field, (f, r) => r.ReadLong(f));
+ writeAction = raw
+ ? GetRawWriter<long>(field, (w, o) => w.WriteLong(o))
+ : GetWriter<long>(field, (f, w, o) => w.WriteLong(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadLong()) : GetReader(field, (f, r) => r.ReadLong(f));
}
- else if (type == typeof(ulong))
+ else if (type == typeof (ulong))
{
- writeAction = GetWriter<ulong>(field, (f, w, o) => w.WriteLong(f, unchecked((long) o)));
- readAction = GetReader(field, (f, r) => unchecked((ulong) r.ReadLong(f)));
+ writeAction = raw
+ ? GetRawWriter<ulong>(field, (w, o) => w.WriteLong(unchecked((long) o)))
+ : GetWriter<ulong>(field, (f, w, o) => w.WriteLong(f, unchecked((long) o)));
+ readAction = raw
+ ? GetRawReader(field, r => unchecked((ulong) r.ReadLong()))
+ : GetReader(field, (f, r) => unchecked((ulong) r.ReadLong(f)));
}
- else if (type == typeof(float))
+ else if (type == typeof (float))
{
- writeAction = GetWriter<float>(field, (f, w, o) => w.WriteFloat(f, o));
- readAction = GetReader(field, (f, r) => r.ReadFloat(f));
+ writeAction = raw
+ ? GetRawWriter<float>(field, (w, o) => w.WriteFloat(o))
+ : GetWriter<float>(field, (f, w, o) => w.WriteFloat(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadFloat()) : GetReader(field, (f, r) => r.ReadFloat(f));
}
- else if (type == typeof(double))
+ else if (type == typeof (double))
{
- writeAction = GetWriter<double>(field, (f, w, o) => w.WriteDouble(f, o));
- readAction = GetReader(field, (f, r) => r.ReadDouble(f));
+ writeAction = raw
+ ? GetRawWriter<double>(field, (w, o) => w.WriteDouble(o))
+ : GetWriter<double>(field, (f, w, o) => w.WriteDouble(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadDouble())
+ : GetReader(field, (f, r) => r.ReadDouble(f));
}
else
throw new IgniteException("Unsupported primitive type: " + type.Name);
@@ -174,96 +236,165 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="field">The field.</param>
/// <param name="writeAction">Write action.</param>
/// <param name="readAction">Read action.</param>
+ /// <param name="raw">Raw mode.</param>
private static void HandleArray(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
- out BinaryReflectiveReadAction readAction)
+ out BinaryReflectiveReadAction readAction, bool raw)
{
Type elemType = field.FieldType.GetElementType();
- if (elemType == typeof(bool))
- {
- writeAction = GetWriter<bool[]>(field, (f, w, o) => w.WriteBooleanArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadBooleanArray(f));
- }
- else if (elemType == typeof(byte))
- {
- writeAction = GetWriter<byte[]>(field, (f, w, o) => w.WriteByteArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadByteArray(f));
- }
- else if (elemType == typeof(sbyte))
- {
- writeAction = GetWriter<sbyte[]>(field, (f, w, o) => w.WriteByteArray(f, (byte[]) (Array) o));
- readAction = GetReader(field, (f, r) => (sbyte[]) (Array) r.ReadByteArray(f));
- }
- else if (elemType == typeof(short))
- {
- writeAction = GetWriter<short[]>(field, (f, w, o) => w.WriteShortArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadShortArray(f));
- }
- else if (elemType == typeof(ushort))
- {
- writeAction = GetWriter<ushort[]>(field, (f, w, o) => w.WriteShortArray(f, (short[]) (Array) o));
- readAction = GetReader(field, (f, r) => (ushort[]) (Array) r.ReadShortArray(f));
- }
- else if (elemType == typeof(char))
- {
- writeAction = GetWriter<char[]>(field, (f, w, o) => w.WriteCharArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadCharArray(f));
- }
- else if (elemType == typeof(int))
- {
- writeAction = GetWriter<int[]>(field, (f, w, o) => w.WriteIntArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadIntArray(f));
- }
- else if (elemType == typeof(uint))
- {
- writeAction = GetWriter<uint[]>(field, (f, w, o) => w.WriteIntArray(f, (int[]) (Array) o));
- readAction = GetReader(field, (f, r) => (uint[]) (Array) r.ReadIntArray(f));
+ if (elemType == typeof (bool))
+ {
+ writeAction = raw
+ ? GetRawWriter<bool[]>(field, (w, o) => w.WriteBooleanArray(o))
+ : GetWriter<bool[]>(field, (f, w, o) => w.WriteBooleanArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadBooleanArray())
+ : GetReader(field, (f, r) => r.ReadBooleanArray(f));
+ }
+ else if (elemType == typeof (byte))
+ {
+ writeAction = raw
+ ? GetRawWriter<byte[]>(field, (w, o) => w.WriteByteArray(o))
+ : GetWriter<byte[]>(field, (f, w, o) => w.WriteByteArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadByteArray())
+ : GetReader(field, (f, r) => r.ReadByteArray(f));
+ }
+ else if (elemType == typeof (sbyte))
+ {
+ writeAction = raw
+ ? GetRawWriter<sbyte[]>(field, (w, o) => w.WriteByteArray((byte[]) (Array) o))
+ : GetWriter<sbyte[]>(field, (f, w, o) => w.WriteByteArray(f, (byte[]) (Array) o));
+ readAction = raw
+ ? GetRawReader(field, r => (sbyte[]) (Array) r.ReadByteArray())
+ : GetReader(field, (f, r) => (sbyte[]) (Array) r.ReadByteArray(f));
+ }
+ else if (elemType == typeof (short))
+ {
+ writeAction = raw
+ ? GetRawWriter<short[]>(field, (w, o) => w.WriteShortArray(o))
+ : GetWriter<short[]>(field, (f, w, o) => w.WriteShortArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadShortArray())
+ : GetReader(field, (f, r) => r.ReadShortArray(f));
+ }
+ else if (elemType == typeof (ushort))
+ {
+ writeAction = raw
+ ? GetRawWriter<ushort[]>(field, (w, o) => w.WriteShortArray((short[]) (Array) o))
+ : GetWriter<ushort[]>(field, (f, w, o) => w.WriteShortArray(f, (short[]) (Array) o));
+ readAction = raw
+ ? GetRawReader(field, r => (ushort[]) (Array) r.ReadShortArray())
+ : GetReader(field, (f, r) => (ushort[]) (Array) r.ReadShortArray(f));
+ }
+ else if (elemType == typeof (char))
+ {
+ writeAction = raw
+ ? GetRawWriter<char[]>(field, (w, o) => w.WriteCharArray(o))
+ : GetWriter<char[]>(field, (f, w, o) => w.WriteCharArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadCharArray())
+ : GetReader(field, (f, r) => r.ReadCharArray(f));
+ }
+ else if (elemType == typeof (int))
+ {
+ writeAction = raw
+ ? GetRawWriter<int[]>(field, (w, o) => w.WriteIntArray(o))
+ : GetWriter<int[]>(field, (f, w, o) => w.WriteIntArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadIntArray())
+ : GetReader(field, (f, r) => r.ReadIntArray(f));
+ }
+ else if (elemType == typeof (uint))
+ {
+ writeAction = raw
+ ? GetRawWriter<uint[]>(field, (w, o) => w.WriteIntArray((int[]) (Array) o))
+ : GetWriter<uint[]>(field, (f, w, o) => w.WriteIntArray(f, (int[]) (Array) o));
+ readAction = raw
+ ? GetRawReader(field, r => (uint[]) (Array) r.ReadIntArray())
+ : GetReader(field, (f, r) => (uint[]) (Array) r.ReadIntArray(f));
+ }
+ else if (elemType == typeof (long))
+ {
+ writeAction = raw
+ ? GetRawWriter<long[]>(field, (w, o) => w.WriteLongArray(o))
+ : GetWriter<long[]>(field, (f, w, o) => w.WriteLongArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadLongArray())
+ : GetReader(field, (f, r) => r.ReadLongArray(f));
+ }
+ else if (elemType == typeof (ulong))
+ {
+ writeAction = raw
+ ? GetRawWriter<ulong[]>(field, (w, o) => w.WriteLongArray((long[]) (Array) o))
+ : GetWriter<ulong[]>(field, (f, w, o) => w.WriteLongArray(f, (long[]) (Array) o));
+ readAction = raw
+ ? GetRawReader(field, r => (ulong[]) (Array) r.ReadLongArray())
+ : GetReader(field, (f, r) => (ulong[]) (Array) r.ReadLongArray(f));
+ }
+ else if (elemType == typeof (float))
+ {
+ writeAction = raw
+ ? GetRawWriter<float[]>(field, (w, o) => w.WriteFloatArray(o))
+ : GetWriter<float[]>(field, (f, w, o) => w.WriteFloatArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadFloatArray())
+ : GetReader(field, (f, r) => r.ReadFloatArray(f));
+ }
+ else if (elemType == typeof (double))
+ {
+ writeAction = raw
+ ? GetRawWriter<double[]>(field, (w, o) => w.WriteDoubleArray(o))
+ : GetWriter<double[]>(field, (f, w, o) => w.WriteDoubleArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadDoubleArray())
+ : GetReader(field, (f, r) => r.ReadDoubleArray(f));
+ }
+ else if (elemType == typeof (decimal?))
+ {
+ writeAction = raw
+ ? GetRawWriter<decimal?[]>(field, (w, o) => w.WriteDecimalArray(o))
+ : GetWriter<decimal?[]>(field, (f, w, o) => w.WriteDecimalArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadDecimalArray())
+ : GetReader(field, (f, r) => r.ReadDecimalArray(f));
+ }
+ else if (elemType == typeof (string))
+ {
+ writeAction = raw
+ ? GetRawWriter<string[]>(field, (w, o) => w.WriteStringArray(o))
+ : GetWriter<string[]>(field, (f, w, o) => w.WriteStringArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadStringArray())
+ : GetReader(field, (f, r) => r.ReadStringArray(f));
+ }
+ else if (elemType == typeof (Guid?))
+ {
+ writeAction = raw
+ ? GetRawWriter<Guid?[]>(field, (w, o) => w.WriteGuidArray(o))
+ : GetWriter<Guid?[]>(field, (f, w, o) => w.WriteGuidArray(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadGuidArray())
+ : GetReader(field, (f, r) => r.ReadGuidArray(f));
}
- else if (elemType == typeof(long))
- {
- writeAction = GetWriter<long[]>(field, (f, w, o) => w.WriteLongArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadLongArray(f));
- }
- else if (elemType == typeof(ulong))
- {
- writeAction = GetWriter<ulong[]>(field, (f, w, o) => w.WriteLongArray(f, (long[]) (Array) o));
- readAction = GetReader(field, (f, r) => (ulong[]) (Array) r.ReadLongArray(f));
- }
- else if (elemType == typeof(float))
- {
- writeAction = GetWriter<float[]>(field, (f, w, o) => w.WriteFloatArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadFloatArray(f));
- }
- else if (elemType == typeof(double))
- {
- writeAction = GetWriter<double[]>(field, (f, w, o) => w.WriteDoubleArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadDoubleArray(f));
- }
- else if (elemType == typeof(decimal?))
- {
- writeAction = GetWriter<decimal?[]>(field, (f, w, o) => w.WriteDecimalArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadDecimalArray(f));
- }
- else if (elemType == typeof(string))
- {
- writeAction = GetWriter<string[]>(field, (f, w, o) => w.WriteStringArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadStringArray(f));
- }
- else if (elemType == typeof(Guid?))
- {
- writeAction = GetWriter<Guid?[]>(field, (f, w, o) => w.WriteGuidArray(f, o));
- readAction = GetReader(field, (f, r) => r.ReadGuidArray(f));
- }
else if (elemType.IsEnum)
{
- writeAction = GetWriter(field, MthdWriteEnumArray, elemType);
- readAction = GetReader(field, MthdReadEnumArray, elemType);
+ writeAction = raw
+ ? GetRawWriter(field, MthdWriteEnumArrayRaw, elemType)
+ : GetWriter(field, MthdWriteEnumArray, elemType);
+ readAction = raw
+ ? GetRawReader(field, MthdReadEnumArrayRaw, elemType)
+ : GetReader(field, MthdReadEnumArray, elemType);
}
else
{
- writeAction = GetWriter(field, MthdWriteObjArray, elemType);
- readAction = GetReader(field, MthdReadObjArray, elemType);
- }
+ writeAction = raw
+ ? GetRawWriter(field, MthdWriteObjArrayRaw, elemType)
+ : GetWriter(field, MthdWriteObjArray, elemType);
+ readAction = raw
+ ? GetRawReader(field, MthdReadObjArrayRaw, elemType)
+ : GetReader(field, MthdReadObjArray, elemType);
+ }
}
/// <summary>
@@ -272,57 +403,84 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="field">The field.</param>
/// <param name="writeAction">Write action.</param>
/// <param name="readAction">Read action.</param>
+ /// <param name="raw">Raw mode.</param>
private static void HandleOther(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
- out BinaryReflectiveReadAction readAction)
+ out BinaryReflectiveReadAction readAction, bool raw)
{
var type = field.FieldType;
var genericDef = type.IsGenericType ? type.GetGenericTypeDefinition() : null;
- bool nullable = genericDef == typeof(Nullable<>);
+ bool nullable = genericDef == typeof (Nullable<>);
var nullableType = nullable ? type.GetGenericArguments()[0] : null;
- if (type == typeof(decimal))
+ if (type == typeof (decimal))
{
- writeAction = GetWriter<decimal>(field, (f, w, o) => w.WriteDecimal(f, o));
- readAction = GetReader(field, (f, r) => r.ReadDecimal(f));
+ writeAction = raw
+ ? GetRawWriter<decimal>(field, (w, o) => w.WriteDecimal(o))
+ : GetWriter<decimal>(field, (f, w, o) => w.WriteDecimal(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadDecimal())
+ : GetReader(field, (f, r) => r.ReadDecimal(f));
}
- else if (type == typeof(string))
+ else if (type == typeof (string))
{
- writeAction = GetWriter<string>(field, (f, w, o) => w.WriteString(f, o));
- readAction = GetReader(field, (f, r) => r.ReadString(f));
+ writeAction = raw
+ ? GetRawWriter<string>(field, (w, o) => w.WriteString(o))
+ : GetWriter<string>(field, (f, w, o) => w.WriteString(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadString())
+ : GetReader(field, (f, r) => r.ReadString(f));
}
- else if (type == typeof(Guid))
+ else if (type == typeof (Guid))
{
- writeAction = GetWriter<Guid>(field, (f, w, o) => w.WriteGuid(f, o));
- readAction = GetReader(field, (f, r) => r.ReadObject<Guid>(f));
+ writeAction = raw
+ ? GetRawWriter<Guid>(field, (w, o) => w.WriteGuid(o))
+ : GetWriter<Guid>(field, (f, w, o) => w.WriteGuid(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadObject<Guid>())
+ : GetReader(field, (f, r) => r.ReadObject<Guid>(f));
}
- else if (nullable && nullableType == typeof(Guid))
+ else if (nullable && nullableType == typeof (Guid))
{
- writeAction = GetWriter<Guid?>(field, (f, w, o) => w.WriteGuid(f, o));
- readAction = GetReader(field, (f, r) => r.ReadGuid(f));
- }
+ writeAction = raw
+ ? GetRawWriter<Guid?>(field, (w, o) => w.WriteGuid(o))
+ : GetWriter<Guid?>(field, (f, w, o) => w.WriteGuid(f, o));
+ readAction = raw ? GetRawReader(field, r => r.ReadGuid()) : GetReader(field, (f, r) => r.ReadGuid(f));
+ }
else if (type.IsEnum)
{
- writeAction = GetWriter<object>(field, (f, w, o) => w.WriteEnum(f, o), true);
- readAction = GetReader(field, MthdReadEnum);
+ writeAction = raw
+ ? GetRawWriter<object>(field, (w, o) => w.WriteEnum(o), true)
+ : GetWriter<object>(field, (f, w, o) => w.WriteEnum(f, o), true);
+ readAction = raw ? GetRawReader(field, MthdReadEnumRaw) : GetReader(field, MthdReadEnum);
}
- else if (type == BinaryUtils.TypDictionary || type.GetInterface(BinaryUtils.TypDictionary.FullName) != null && !type.IsGenericType)
+ else if (type == BinaryUtils.TypDictionary ||
+ type.GetInterface(BinaryUtils.TypDictionary.FullName) != null && !type.IsGenericType)
{
- writeAction = GetWriter<IDictionary>(field, (f, w, o) => w.WriteDictionary(f, o));
- readAction = GetReader(field, (f, r) => r.ReadDictionary(f));
+ writeAction = raw
+ ? GetRawWriter<IDictionary>(field, (w, o) => w.WriteDictionary(o))
+ : GetWriter<IDictionary>(field, (f, w, o) => w.WriteDictionary(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadDictionary())
+ : GetReader(field, (f, r) => r.ReadDictionary(f));
}
- else if (type == BinaryUtils.TypCollection || type.GetInterface(BinaryUtils.TypCollection.FullName) != null && !type.IsGenericType)
+ else if (type == BinaryUtils.TypCollection ||
+ type.GetInterface(BinaryUtils.TypCollection.FullName) != null && !type.IsGenericType)
{
- writeAction = GetWriter<ICollection>(field, (f, w, o) => w.WriteCollection(f, o));
- readAction = GetReader(field, (f, r) => r.ReadCollection(f));
+ writeAction = raw
+ ? GetRawWriter<ICollection>(field, (w, o) => w.WriteCollection(o))
+ : GetWriter<ICollection>(field, (f, w, o) => w.WriteCollection(f, o));
+ readAction = raw
+ ? GetRawReader(field, r => r.ReadCollection())
+ : GetReader(field, (f, r) => r.ReadCollection(f));
}
else
{
- writeAction = GetWriter(field, MthdWriteObj);
- readAction = GetReader(field, MthdReadObj);
- }
+ writeAction = raw ? GetRawWriter(field, MthdWriteObjRaw) : GetWriter(field, MthdWriteObj);
+ readAction = raw ? GetRawReader(field, MthdReadObjRaw) : GetReader(field, MthdReadObj);
+ }
}
/// <summary>
@@ -334,6 +492,7 @@ namespace Apache.Ignite.Core.Impl.Binary
{
Debug.Assert(field != null);
Debug.Assert(field.DeclaringType != null); // non-static
+ Debug.Assert(write != null);
// Get field value
var targetParam = Expression.Parameter(typeof(object));
@@ -353,13 +512,59 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
+ /// Gets the reader with a specified write action.
+ /// </summary>
+ private static BinaryReflectiveWriteAction GetRawWriter<T>(FieldInfo field,
+ Expression<Action<IBinaryRawWriter, T>> write,
+ bool convertFieldValToObject = false)
+ {
+ Debug.Assert(field != null);
+ Debug.Assert(field.DeclaringType != null); // non-static
+ Debug.Assert(write != null);
+
+ // Get field value
+ var targetParam = Expression.Parameter(typeof(object));
+ var targetParamConverted = Expression.Convert(targetParam, field.DeclaringType);
+ Expression fldExpr = Expression.Field(targetParamConverted, field);
+
+ if (convertFieldValToObject)
+ fldExpr = Expression.Convert(fldExpr, typeof (object));
+
+ // Call Writer method
+ var writerParam = Expression.Parameter(typeof(IBinaryWriter));
+ var writeExpr = Expression.Invoke(write, Expression.Call(writerParam, MthdGetRawWriter), fldExpr);
+
+ // Compile and return
+ return Expression.Lambda<BinaryReflectiveWriteAction>(writeExpr, targetParam, writerParam).Compile();
+ }
+
+ /// <summary>
/// Gets the writer with a specified generic method.
/// </summary>
- private static BinaryReflectiveWriteAction GetWriter(FieldInfo field, MethodInfo method,
+ private static BinaryReflectiveWriteAction GetWriter(FieldInfo field, MethodInfo method,
+ params Type[] genericArgs)
+ {
+ return GetWriter0(field, method, false, genericArgs);
+ }
+
+ /// <summary>
+ /// Gets the writer with a specified generic method.
+ /// </summary>
+ private static BinaryReflectiveWriteAction GetRawWriter(FieldInfo field, MethodInfo method,
+ params Type[] genericArgs)
+ {
+ return GetWriter0(field, method, true, genericArgs);
+ }
+
+ /// <summary>
+ /// Gets the writer with a specified generic method.
+ /// </summary>
+ private static BinaryReflectiveWriteAction GetWriter0(FieldInfo field, MethodInfo method, bool raw,
params Type[] genericArgs)
{
Debug.Assert(field != null);
Debug.Assert(field.DeclaringType != null); // non-static
+ Debug.Assert(method != null);
if (genericArgs.Length == 0)
genericArgs = new[] {field.FieldType};
@@ -370,10 +575,14 @@ namespace Apache.Ignite.Core.Impl.Binary
var fldExpr = Expression.Field(targetParamConverted, field);
// Call Writer method
- var writerParam = Expression.Parameter(typeof(IBinaryWriter));
- var fldNameParam = Expression.Constant(BinaryUtils.CleanFieldName(field.Name));
+ var writerParam = Expression.Parameter(typeof (IBinaryWriter));
+
var writeMethod = method.MakeGenericMethod(genericArgs);
- var writeExpr = Expression.Call(writerParam, writeMethod, fldNameParam, fldExpr);
+
+ var writeExpr = raw
+ ? Expression.Call(Expression.Call(writerParam, MthdGetRawWriter), writeMethod, fldExpr)
+ : Expression.Call(writerParam, writeMethod, Expression.Constant(BinaryUtils.CleanFieldName(field.Name)),
+ fldExpr);
// Compile and return
return Expression.Lambda<BinaryReflectiveWriteAction>(writeExpr, targetParam, writerParam).Compile();
@@ -405,9 +614,51 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
+ /// Gets the reader with a specified read action.
+ /// </summary>
+ private static BinaryReflectiveReadAction GetRawReader<T>(FieldInfo field,
+ Expression<Func<IBinaryRawReader, T>> read)
+ {
+ Debug.Assert(field != null);
+ Debug.Assert(field.DeclaringType != null); // non-static
+
+ // Call Reader method
+ var readerParam = Expression.Parameter(typeof(IBinaryReader));
+ Expression readExpr = Expression.Invoke(read, Expression.Call(readerParam, MthdGetRawReader));
+
+ if (typeof(T) != field.FieldType)
+ readExpr = Expression.Convert(readExpr, field.FieldType);
+
+ // Assign field value
+ var targetParam = Expression.Parameter(typeof(object));
+ var assignExpr = Expression.Call(DelegateConverter.GetWriteFieldMethod(field), targetParam, readExpr);
+
+ // Compile and return
+ return Expression.Lambda<BinaryReflectiveReadAction>(assignExpr, targetParam, readerParam).Compile();
+ }
+
+ /// <summary>
/// Gets the reader with a specified generic method.
/// </summary>
- private static BinaryReflectiveReadAction GetReader(FieldInfo field, MethodInfo method,
+ private static BinaryReflectiveReadAction GetReader(FieldInfo field, MethodInfo method,
+ params Type[] genericArgs)
+ {
+ return GetReader0(field, method, false, genericArgs);
+ }
+
+ /// <summary>
+ /// Gets the reader with a specified generic method.
+ /// </summary>
+ private static BinaryReflectiveReadAction GetRawReader(FieldInfo field, MethodInfo method,
+ params Type[] genericArgs)
+ {
+ return GetReader0(field, method, true, genericArgs);
+ }
+
+ /// <summary>
+ /// Gets the reader with a specified generic method.
+ /// </summary>
+ private static BinaryReflectiveReadAction GetReader0(FieldInfo field, MethodInfo method, bool raw,
params Type[] genericArgs)
{
Debug.Assert(field != null);
@@ -418,9 +669,10 @@ namespace Apache.Ignite.Core.Impl.Binary
// Call Reader method
var readerParam = Expression.Parameter(typeof (IBinaryReader));
- var fldNameParam = Expression.Constant(BinaryUtils.CleanFieldName(field.Name));
var readMethod = method.MakeGenericMethod(genericArgs);
- Expression readExpr = Expression.Call(readerParam, readMethod, fldNameParam);
+ Expression readExpr = raw
+ ? Expression.Call(Expression.Call(readerParam, MthdGetRawReader), readMethod)
+ : Expression.Call(readerParam, readMethod, Expression.Constant(BinaryUtils.CleanFieldName(field.Name)));
if (readMethod.ReturnType != field.FieldType)
readExpr = Expression.Convert(readExpr, field.FieldType);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
deleted file mode 100644
index ceffef5..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary
-{
- using System;
- using System.Collections.Generic;
- using System.Reflection;
- using Apache.Ignite.Core.Binary;
-
- /// <summary>
- /// Binary serializer which reflectively writes all fields except of ones with
- /// <see cref="System.NonSerializedAttribute"/>.
- /// <para />
- /// Note that Java platform stores dates as a difference between current time
- /// and predefined absolute UTC date. Therefore, this difference is always the
- /// same for all time zones. .Net, in contrast, stores dates as a difference
- /// between current time and some predefined date relative to the current time
- /// zone. It means that this difference will be different as you change time zones.
- /// To overcome this discrepancy Ignite always converts .Net date to UTC form
- /// before serializing and allows user to decide whether to deserialize them
- /// in UTC or local form using <c>ReadTimestamp(..., true/false)</c> methods in
- /// <see cref="IBinaryReader"/> and <see cref="IBinaryRawReader"/>.
- /// This serializer always read dates in UTC form. It means that if you have
- /// local date in any field/property, it will be implicitly converted to UTC
- /// form after the first serialization-deserialization cycle.
- /// </summary>
- internal class BinaryReflectiveSerializer : IBinarySerializer
- {
- /** Cached binding flags. */
- private const BindingFlags Flags = BindingFlags.Instance | BindingFlags.Public |
- BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
-
- /** Cached type descriptors. */
- private readonly IDictionary<Type, Descriptor> _types = new Dictionary<Type, Descriptor>();
-
- /// <summary>
- /// Write portalbe object.
- /// </summary>
- /// <param name="obj">Object.</param>
- /// <param name="writer">Writer.</param>
- /// <exception cref="BinaryObjectException">Type is not registered in serializer: + type.Name</exception>
- public void WriteBinary(object obj, IBinaryWriter writer)
- {
- var binarizable = obj as IBinarizable;
-
- if (binarizable != null)
- binarizable.WriteBinary(writer);
- else
- GetDescriptor(obj).Write(obj, writer);
- }
-
- /// <summary>
- /// Read binary object.
- /// </summary>
- /// <param name="obj">Instantiated empty object.</param>
- /// <param name="reader">Reader.</param>
- /// <exception cref="BinaryObjectException">Type is not registered in serializer: + type.Name</exception>
- public void ReadBinary(object obj, IBinaryReader reader)
- {
- var binarizable = obj as IBinarizable;
-
- if (binarizable != null)
- binarizable.ReadBinary(reader);
- else
- GetDescriptor(obj).Read(obj, reader);
- }
-
- /// <summary>Register type.</summary>
- /// <param name="type">Type.</param>
- /// <param name="typeId">Type ID.</param>
- /// <param name="converter">Name converter.</param>
- /// <param name="idMapper">ID mapper.</param>
- public void Register(Type type, int typeId, IBinaryNameMapper converter,
- IBinaryIdMapper idMapper)
- {
- if (type.GetInterface(typeof(IBinarizable).Name) != null)
- return;
-
- List<FieldInfo> fields = new List<FieldInfo>();
-
- Type curType = type;
-
- while (curType != null)
- {
- foreach (FieldInfo field in curType.GetFields(Flags))
- {
- if (!field.IsNotSerialized)
- fields.Add(field);
- }
-
- curType = curType.BaseType;
- }
-
- IDictionary<int, string> idMap = new Dictionary<int, string>();
-
- foreach (FieldInfo field in fields)
- {
- string fieldName = BinaryUtils.CleanFieldName(field.Name);
-
- int fieldId = BinaryUtils.FieldId(typeId, fieldName, converter, idMapper);
-
- if (idMap.ContainsKey(fieldId))
- {
- throw new BinaryObjectException("Conflicting field IDs [type=" +
- type.Name + ", field1=" + idMap[fieldId] + ", field2=" + fieldName +
- ", fieldId=" + fieldId + ']');
- }
-
- idMap[fieldId] = fieldName;
- }
-
- fields.Sort(Compare);
-
- Descriptor desc = new Descriptor(fields);
-
- _types[type] = desc;
- }
-
- /// <summary>
- /// Gets the descriptor for an object.
- /// </summary>
- private Descriptor GetDescriptor(object obj)
- {
- var type = obj.GetType();
-
- Descriptor desc;
-
- if (!_types.TryGetValue(type, out desc))
- throw new BinaryObjectException("Type is not registered in serializer: " + type.Name);
-
- return desc;
- }
-
- /// <summary>
- /// Compare two FieldInfo instances.
- /// </summary>
- private static int Compare(FieldInfo info1, FieldInfo info2) {
- string name1 = BinaryUtils.CleanFieldName(info1.Name);
- string name2 = BinaryUtils.CleanFieldName(info2.Name);
-
- return string.Compare(name1, name2, StringComparison.OrdinalIgnoreCase);
- }
-
- /// <summary>
- /// Type descriptor.
- /// </summary>
- private class Descriptor
- {
- /** Write actions to be performed. */
- private readonly List<BinaryReflectiveWriteAction> _wActions;
-
- /** Read actions to be performed. */
- private readonly List<BinaryReflectiveReadAction> _rActions;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="fields">Fields.</param>
- public Descriptor(List<FieldInfo> fields)
- {
- _wActions = new List<BinaryReflectiveWriteAction>(fields.Count);
- _rActions = new List<BinaryReflectiveReadAction>(fields.Count);
-
- foreach (FieldInfo field in fields)
- {
- BinaryReflectiveWriteAction writeAction;
- BinaryReflectiveReadAction readAction;
-
- BinaryReflectiveActions.TypeActions(field, out writeAction, out readAction);
-
- _wActions.Add(writeAction);
- _rActions.Add(readAction);
- }
- }
-
- /// <summary>
- /// Write object.
- /// </summary>
- /// <param name="obj">Object.</param>
- /// <param name="writer">Writer.</param>
- public void Write(object obj, IBinaryWriter writer)
- {
- int cnt = _wActions.Count;
-
- for (int i = 0; i < cnt; i++)
- _wActions[i](obj, writer);
- }
-
- /// <summary>
- /// Read object.
- /// </summary>
- /// <param name="obj">Object.</param>
- /// <param name="reader">Reader.</param>
- public void Read(object obj, IBinaryReader reader)
- {
- int cnt = _rActions.Count;
-
- for (int i = 0; i < cnt; i++ )
- _rActions[i](obj, reader);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
index a7ce544..5b1273e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
@@ -950,10 +950,7 @@ namespace Apache.Ignite.Core.Impl.Binary
{
WriteFieldId(fieldName, BinaryUtils.TypeCollection);
- if (val == null)
- WriteNullField();
- else
- WriteCollection(val);
+ WriteCollection(val);
}
/// <summary>
@@ -962,8 +959,13 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="val">Collection.</param>
public void WriteCollection(ICollection val)
{
- WriteByte(BinaryUtils.TypeCollection);
- BinaryUtils.WriteCollection(val, this);
+ if (val == null)
+ WriteNullField();
+ else
+ {
+ WriteByte(BinaryUtils.TypeCollection);
+ BinaryUtils.WriteCollection(val, this);
+ }
}
/// <summary>
@@ -975,10 +977,7 @@ namespace Apache.Ignite.Core.Impl.Binary
{
WriteFieldId(fieldName, BinaryUtils.TypeDictionary);
- if (val == null)
- WriteNullField();
- else
- WriteDictionary(val);
+ WriteDictionary(val);
}
/// <summary>
@@ -987,8 +986,13 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <param name="val">Dictionary.</param>
public void WriteDictionary(IDictionary val)
{
- WriteByte(BinaryUtils.TypeDictionary);
- BinaryUtils.WriteDictionary(val, this);
+ if (val == null)
+ WriteNullField();
+ else
+ {
+ WriteByte(BinaryUtils.TypeDictionary);
+ BinaryUtils.WriteDictionary(val, this);
+ }
}
/// <summary>
[09/10] ignite git commit: IGNITE-2320 Deprecated
setMarshallerCachePoolSize() and created new
setMarshallerCacheThreadPoolSize() to be consistent with
getMarshallerCacheThreadPoolSize().
Posted by sb...@apache.org.
IGNITE-2320 Deprecated setMarshallerCachePoolSize() and created new setMarshallerCacheThreadPoolSize() to be consistent with getMarshallerCacheThreadPoolSize().
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c5db210
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c5db210
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c5db210
Branch: refs/heads/ignite-1232
Commit: 7c5db2102fb701a24e03f50232a5012122672d32
Parents: baa1312
Author: vsisko <vs...@gridgain.com>
Authored: Fri Mar 4 10:01:59 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Mar 4 10:01:59 2016 +0700
----------------------------------------------------------------------
.../ignite/configuration/IgniteConfiguration.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c5db210/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index f705638..758a2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -891,8 +891,22 @@ public class IgniteConfiguration {
* @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
* @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
* @return {@code this} for chaining.
+ * @deprecated Use {@link #setMarshallerCacheThreadPoolSize(int)} instead.
*/
+ @Deprecated
public IgniteConfiguration setMarshallerCachePoolSize(int poolSize) {
+ return setMarshallerCacheThreadPoolSize(poolSize);
+ }
+
+ /**
+ * Sets default thread pool size that will be used to process marshaller messages.
+ *
+ * @param poolSize Default executor service size to use for marshaller messages.
+ * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
+ * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setMarshallerCacheThreadPoolSize(int poolSize) {
marshCachePoolSize = poolSize;
return this;
[03/10] ignite git commit: IGNITE-799 Fixed Desktop API is not
supported on the current platform.
Posted by sb...@apache.org.
IGNITE-799 Fixed Desktop API is not supported on the current platform.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28ea3433
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28ea3433
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28ea3433
Branch: refs/heads/ignite-1232
Commit: 28ea3433bfbe64cd5166adc29c1badd8d601761e
Parents: cba4f4c
Author: vsisko <vs...@gridgain.com>
Authored: Thu Mar 3 16:17:57 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 3 16:17:57 2016 +0700
----------------------------------------------------------------------
.../ignite/schema/ui/SchemaImportApp.java | 205 ++++++++++++++++++-
1 file changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/28ea3433/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
----------------------------------------------------------------------
diff --git a/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java b/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
index 6f9e05b..98ac357 100644
--- a/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
+++ b/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
@@ -17,14 +17,19 @@
package org.apache.ignite.schema.ui;
+import java.awt.Desktop;
import java.io.BufferedInputStream;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
@@ -39,6 +44,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.beans.value.ChangeListener;
@@ -75,6 +82,7 @@ import javafx.stage.FileChooser;
import javafx.stage.Screen;
import javafx.stage.Stage;
import javafx.util.Callback;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.schema.generator.CodeGenerator;
import org.apache.ignite.schema.generator.XmlGenerator;
@@ -119,6 +127,9 @@ public class SchemaImportApp extends Application {
/** Logger. */
private static final Logger log = Logger.getLogger(SchemaImportApp.class.getName());
+ /** Ability to use xdg-open utility flag. */
+ private static final boolean HAS_XDG_OPEN = U.isUnix() && new File("/usr/bin/xdg-open").canExecute();
+
/** Presets for database settings. */
private static class Preset {
/** Name in preferences. */
@@ -661,6 +672,194 @@ public class SchemaImportApp extends Application {
return null;
}
+ /**
+ * Running of command with reading of first printed line.
+ *
+ * @param cmdLine Process to run.
+ * @return First printed by command line.
+ */
+ private String execAndReadLine(Process cmdLine) {
+ InputStream stream = cmdLine.getInputStream();
+ Charset cs = Charset.defaultCharset();
+
+ try(BufferedReader reader = new BufferedReader(
+ cs == null ? new InputStreamReader(stream) : new InputStreamReader(stream, cs))) {
+ return reader.readLine();
+ }
+ catch (IOException ignored){
+ return null;
+ }
+ }
+
+ /**
+ * Start specified command in separate process.
+ *
+ * @param commands Executable file and command parameters in array.
+ * @return Process instance for run command.
+ */
+ private Process startProcess(List<String> commands) throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(commands);
+
+ Map<String, String> environment = builder.environment();
+
+ environment.clear();
+
+ environment.putAll(System.getenv());
+
+ return builder.start();
+ }
+
+ /**
+ * Convert specified command parameters to system specific parameters.
+ *
+ * @param cmd Path to executable file.
+ * @param parameters Params for created process.
+ * @return List of converted system specific parameters.
+ */
+ private List<String> toCommandLine(String cmd, String... parameters) {
+ boolean isWin = U.isWindows();
+
+ List<String> params = new ArrayList<>(parameters.length + 1);
+
+ params.add(cmd.replace('/', File.separatorChar).replace('\\', File.separatorChar));
+
+ for (String parameter: parameters) {
+ if (isWin) {
+ if (parameter.contains("\"")) params.add(parameter.replace("\"", "\\\""));
+ else if (parameter.isEmpty()) params.add("\"\"");
+ else params.add(parameter);
+ }
+ else
+ params.add(parameter);
+ }
+
+ return params;
+ }
+
+ /**
+ * Create process for run specified command.
+ *
+ * @param execPath Path to executable file.
+ * @param params Params for created process.
+ * @return Process instance for run command.
+ */
+ private Process createProcess(String execPath, String... params) throws IOException {
+ if (F.isEmpty(execPath))
+ throw new IllegalArgumentException("Executable not specified");
+
+ return startProcess(toCommandLine(execPath, params));
+ }
+
+ /**
+ * Compare two version strings.
+ *
+ * @param v1 Version string 1.
+ * @param v2 Version string 2.
+ * @return The value 0 if the argument version is equal to this version.
+ * A value less than 0 if this version is less than the version argument.
+ * A value greater than 0 if this version is greater than the version argument.
+ */
+ private int compareVersionNumbers(String v1, String v2) {
+ if (v1 == null && v2 == null)
+ return 0;
+
+ if (v1 == null)
+ return -1;
+
+ if (v2 == null)
+ return 1;
+
+ String[] part1 = v1.split("[._-]");
+ String[] part2 = v2.split("[._-]");
+
+ int idx = 0;
+
+ while (idx < part1.length && idx < part2.length) {
+ String p1 = part1[idx];
+ String p2 = part2[idx];
+
+ int cmp = p1.matches("\\d+") && p2.matches("\\d+") ? new Integer(p1).compareTo(new Integer(p2)) :
+ part1[idx].compareTo(part2[idx]);
+
+ if (cmp != 0)
+ return cmp;
+
+ idx += 1;
+ }
+
+ if (part1.length == part2.length)
+ return 0;
+ else {
+ boolean left = part1.length > idx;
+ String[] parts = left ? part1 : part2;
+
+ while (idx < parts.length) {
+ String p = parts[idx];
+
+ int cmp = p.matches("\\d+") ? new Integer(p).compareTo(0) : 1;
+
+ if (cmp != 0) return left ? cmp : -cmp;
+
+ idx += 1;
+ }
+
+ return 0;
+ }
+ }
+
+ /**
+ * Check that system has Nautilus.
+ * @return {@code True} when Nautilus is installed or {@code false} otherwise.
+ * @throws IOException
+ */
+ private boolean canUseNautilus() throws IOException {
+ if (U.isUnix() || new File("/usr/bin/xdg-mime").canExecute() || new File("/usr/bin/nautilus").canExecute()) {
+ String appName = execAndReadLine(createProcess("xdg-mime", "query", "default", "inode/directory"));
+
+ if (appName == null || !appName.matches("nautilus.*\\.desktop"))
+ return false;
+ else {
+ String ver = execAndReadLine(createProcess("nautilus", "--version"));
+
+ if (ver != null) {
+ Matcher m = Pattern.compile("GNOME nautilus ([0-9.]+)").matcher(ver);
+
+ return m.find() && compareVersionNumbers(m.group(1), "3") >= 0;
+ }
+ else
+ return false;
+ }
+ }
+ else
+ return false;
+ }
+
+ /**
+ * Open specified folder with selection of specified file in system file manager.
+ *
+ * @param dir Opened folder.
+ */
+ private void openFolder(File dir) throws IOException {
+ if (U.isWindows())
+ Runtime.getRuntime().exec("explorer /root," + dir.getAbsolutePath());
+ else if (U.isMacOs())
+ createProcess("open", dir.getAbsolutePath());
+ else if (canUseNautilus())
+ createProcess("nautilus", dir.getAbsolutePath());
+ else {
+ String path = dir.getAbsolutePath();
+
+ if (HAS_XDG_OPEN)
+ createProcess("/usr/bin/xdg-open", path);
+ else if (Desktop.isDesktopSupported() && Desktop.getDesktop().isSupported(Desktop.Action.OPEN))
+ Desktop.getDesktop().open(new File(path));
+ else
+ MessageBox.warningDialog(owner, "This action isn't supported on the current platform" +
+ ((U.isLinux() || U.isUnix() || U.isSolaris()) ?
+ ".\nTo fix this issue you should install library libgnome2-0." : ""));
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void succeeded() {
super.succeeded();
@@ -670,9 +869,9 @@ public class SchemaImportApp extends Application {
if (MessageBox.confirmDialog(owner, "Generation complete!\n\n" +
"Reveal output folder in system default file browser?"))
try {
- java.awt.Desktop.getDesktop().open(destFolder);
+ openFolder(destFolder);
}
- catch (IOException e) {
+ catch (Exception e) {
MessageBox.errorDialog(owner, "Failed to open folder with results.", e);
}
}
@@ -1550,7 +1749,7 @@ public class SchemaImportApp extends Application {
if (customPrefsFile == null)
log.log(Level.WARNING, "Failed to resolve path to file with custom preferences: " +
- customPrefsFile);
+ customPrefsFileName);
else {
Properties customPrefs = new Properties();
[10/10] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-1232
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-1232
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b9682c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b9682c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b9682c1
Branch: refs/heads/ignite-1232
Commit: 4b9682c1887f462ac29a9b679192acb8ed376b48
Parents: 4667293 7c5db21
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 4 11:05:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 4 11:05:40 2016 +0300
----------------------------------------------------------------------
.../ignite/cache/query/ContinuousQuery.java | 35 +
.../configuration/IgniteConfiguration.java | 14 +
.../processors/cache/IgniteCacheProxy.java | 4 +
.../continuous/CacheContinuousQueryHandler.java | 86 ++-
.../CacheContinuousQueryHandlerV2.java | 176 +++++
.../continuous/CacheContinuousQueryManager.java | 238 +++++--
.../continuous/GridContinuousProcessor.java | 7 +-
.../processors/igfs/IgfsMetaManager.java | 215 +++---
.../processors/platform/PlatformContext.java | 9 +-
.../platform/PlatformContextImpl.java | 13 +-
.../platform/PlatformProcessorImpl.java | 14 +-
.../platform/compute/PlatformCompute.java | 23 +-
.../utils/PlatformConfigurationUtils.java | 6 +-
.../IgniteCacheEntryListenerAbstractTest.java | 75 +-
.../cache/IgniteCacheEntryListenerTxTest.java | 5 -
.../GridCacheReplicatedPreloadSelfTest.java | 39 +-
.../CacheContinuousQueryFactoryFilterTest.java | 714 +++++++++++++++++++
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../CacheContinuousQueryOperationP2PTest.java | 326 +++++++++
...acheContinuousQueryRandomOperationsTest.java | 63 +-
.../platform/PlatformStartIgniteTask.java | 77 ++
.../ignite/platform/PlatformStopIgniteTask.java | 74 ++
.../p2p/CacheDeploymentEntryEventFilter.java | 33 +
.../CacheDeploymentEntryEventFilterFactory.java | 31 +
.../IgniteCacheQuerySelfTestSuite.java | 4 +
.../Apache.Ignite.Core.Tests.csproj | 1 +
.../Binary/BinarySelfTest.cs | 110 ++-
.../Compute/MixedClusterTest.cs | 123 ++++
.../Config/Compute/compute-grid1.xml | 1 -
.../Config/Compute/compute-grid2.xml | 15 +-
.../IgniteConfigurationTest.cs | 17 +-
.../Apache.Ignite.Core.Tests/TestUtils.cs | 33 +-
.../Apache.Ignite.Core.csproj | 2 +-
.../Binary/BinaryReflectiveSerializer.cs | 241 +++++++
.../Multicast/TcpDiscoveryMulticastIpFinder.cs | 2 +-
.../Impl/Binary/BinaryReflectiveActions.cs | 576 ++++++++++-----
.../Impl/Binary/BinaryReflectiveSerializer.cs | 218 ------
.../Impl/Binary/BinaryWriter.cs | 28 +-
.../ignite/schema/ui/SchemaImportApp.java | 205 +++++-
39 files changed, 3170 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9682c1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
[05/10] ignite git commit: Merge remote-tracking branch
'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb8b52b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb8b52b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb8b52b
Branch: refs/heads/ignite-1232
Commit: bcb8b52b7638bb9cc9be6a3d732a59ef5f4b9494
Parents: b0e85fd 28ea343
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 12:57:12 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 12:57:12 2016 +0300
----------------------------------------------------------------------
.../ignite/schema/ui/SchemaImportApp.java | 205 ++++++++++++++++++-
1 file changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------