You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/04 09:06:07 UTC

[01/10] ignite git commit: IGNITE-2621: Correct handling for tasks in mixed-platform cluster. This closes #505.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 4667293cf -> 4b9682c18


IGNITE-2621: Correct handling for tasks in mixed-platform cluster. This closes #505.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d38eb8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d38eb8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d38eb8f

Branch: refs/heads/ignite-1232
Commit: 2d38eb8fe2567bba86a736a6cec7bb85808d1d60
Parents: ee01b61
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Mar 3 09:52:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 09:52:13 2016 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |   9 +-
 .../platform/PlatformContextImpl.java           |  13 +-
 .../platform/PlatformProcessorImpl.java         |  14 +--
 .../platform/compute/PlatformCompute.java       |  23 +++-
 .../platform/PlatformStartIgniteTask.java       |  77 ++++++++++++
 .../ignite/platform/PlatformStopIgniteTask.java |  74 +++++++++++
 .../Apache.Ignite.Core.Tests.csproj             |   1 +
 .../Compute/MixedClusterTest.cs                 | 123 +++++++++++++++++++
 .../Config/Compute/compute-grid1.xml            |   1 -
 .../Config/Compute/compute-grid2.xml            |  15 ++-
 .../IgniteConfigurationTest.cs                  |  17 +--
 .../Apache.Ignite.Core.Tests/TestUtils.cs       |  33 ++++-
 12 files changed, 365 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index b05d331..e88d57b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -272,4 +272,11 @@ public interface PlatformContext {
      * @return Cluster node filter.
      */
     public PlatformClusterNodeFilter createClusterNodeFilter(Object filter);
-}
+
+    /**
+     * Gets the current platform name.
+     *
+     * @return Current platform name.
+     */
+    public String platform();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
index c531718..b45414a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
@@ -108,6 +108,9 @@ public class PlatformContextImpl implements PlatformContext {
     /** Node ids that has been sent to native platform. */
     private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
 
+    /** Platform name. */
+    private final String platform;
+
     /**
      * Static initializer.
      */
@@ -142,11 +145,14 @@ public class PlatformContextImpl implements PlatformContext {
      * @param ctx Kernal context.
      * @param gate Callback gateway.
      * @param mem Memory manager.
+     * @param platform Platform name.
      */
-    public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem) {
+    public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem,
+        String platform) {
         this.ctx = ctx;
         this.gate = gate;
         this.mem = mem;
+        this.platform = platform;
 
         cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
 
@@ -621,6 +627,11 @@ public class PlatformContextImpl implements PlatformContext {
         return new PlatformClusterNodeFilterImpl(filter, this);
     }
 
+    /** {@inheritDoc} */
+    @Override public String platform() {
+        return platform;
+    }
+
     /**
      * Metadata holder.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index 76967ff..95daa4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -26,11 +26,9 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.PlatformConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteComputeImpl;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
@@ -122,7 +120,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
                 U.warn(log, w);
         }
 
-        platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory());
+        platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory(), interopCfg.platform());
     }
 
     /** {@inheritDoc} */
@@ -209,12 +207,12 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
     }
 
     /** {@inheritDoc} */
-    public void releaseStart() {
+    @Override public void releaseStart() {
         startLatch.countDown();
     }
 
     /** {@inheritDoc} */
-    public void awaitStart() throws IgniteCheckedException {
+    @Override public void awaitStart() throws IgniteCheckedException {
         U.await(startLatch);
     }
 
@@ -305,9 +303,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
     @Override public PlatformTarget compute(PlatformTarget grp) {
         PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
 
-        assert grp0.projection() instanceof ClusterGroupAdapter; // Safety for very complex ClusterGroup hierarchy.
-
-        return new PlatformCompute(platformCtx, (IgniteComputeImpl)((ClusterGroupAdapter)grp0.projection()).compute());
+        return new PlatformCompute(platformCtx, grp0.projection());
     }
 
     /** {@inheritDoc} */
@@ -343,7 +339,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
 
         try {
             if (stopped)
-                throw new IgniteCheckedException("Failed to initialize interop store becuase node is stopping: " +
+                throw new IgniteCheckedException("Failed to initialize interop store because node is stopping: " +
                     store);
 
             if (started)

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 10545d5..a1a82ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.compute;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.internal.IgniteComputeImpl;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -65,18 +67,25 @@ public class PlatformCompute extends PlatformAbstractTarget {
     /** Compute instance. */
     private final IgniteComputeImpl compute;
 
+    /** Compute instance for platform-only nodes. */
+    private final IgniteComputeImpl computeForPlatform;
+
     /** Future for previous asynchronous operation. */
     protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>();
     /**
      * Constructor.
      *
      * @param platformCtx Context.
-     * @param compute Compute instance.
+     * @param grp Cluster group.
      */
-    public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
+    public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp) {
         super(platformCtx);
 
-        this.compute = compute;
+        compute = (IgniteComputeImpl)grp.ignite().compute(grp);
+
+        ClusterGroup platformGrp = grp.forAttribute(PlatformUtils.ATTR_PLATFORM, platformCtx.platform());
+
+        computeForPlatform = (IgniteComputeImpl)grp.ignite().compute(platformGrp);
     }
 
     /** {@inheritDoc} */
@@ -153,7 +162,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
                 ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
         }
 
-        platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
+        platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, computeForPlatform.clusterGroup().nodes());
 
         return executeNative0(task);
     }
@@ -195,7 +204,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
      * @param topVer Topology version.
      */
     public PlatformListenable executeNative(long taskPtr, long topVer) {
-        final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
+        final PlatformFullTask task = new PlatformFullTask(platformCtx, computeForPlatform, taskPtr, topVer);
 
         return executeNative0(task);
     }
@@ -207,6 +216,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
      */
     public void withTimeout(long timeout) {
         compute.withTimeout(timeout);
+        computeForPlatform.withTimeout(timeout);
     }
 
     /**
@@ -214,6 +224,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
      */
     public void withNoFailover() {
         compute.withNoFailover();
+        computeForPlatform.withNoFailover();
     }
 
     /** <inheritDoc /> */
@@ -232,7 +243,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
      * @param task Task.
      */
     private PlatformListenable executeNative0(final PlatformAbstractTask task) {
-        IgniteInternalFuture fut = compute.executeAsync(task, null);
+        IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
 
         fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
             private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
new file mode 100644
index 0000000..c8eb13f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStartIgniteTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to start an Ignite node.
+ */
+public class PlatformStartIgniteTask extends ComputeTaskAdapter<String, String> {
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable String arg) throws IgniteException {
+        return Collections.singletonMap(new PlatformStartIgniteJob(arg), F.first(subgrid));
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String reduce(List<ComputeJobResult> results) throws IgniteException {
+        ComputeJobResult res = results.get(0);
+
+        if (res.getException() != null)
+            throw res.getException();
+        else
+            return results.get(0).getData();
+    }
+
+    /**
+     * Job.
+     */
+    private static class PlatformStartIgniteJob extends ComputeJobAdapter {
+        /** */
+        private final String springConfig;
+
+        /**
+         * Ctor.
+         * @param springConfig Config.
+         */
+        private PlatformStartIgniteJob(String springConfig) {
+            this.springConfig = springConfig;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            Ignite ignite = Ignition.start(springConfig);
+
+            return ignite.name();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
new file mode 100644
index 0000000..c0319ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformStopIgniteTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to stop an Ignite node.
+ */
+public class PlatformStopIgniteTask extends ComputeTaskAdapter<String, Boolean> {
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable String arg) throws IgniteException {
+        return Collections.singletonMap(new PlatformStopIgniteJob(arg), F.first(subgrid));
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+        ComputeJobResult res = results.get(0);
+
+        if (res.getException() != null)
+            throw res.getException();
+        else
+            return results.get(0).getData();
+    }
+
+    /**
+     * Job.
+     */
+    private static class PlatformStopIgniteJob extends ComputeJobAdapter {
+        /** */
+        private final String gridName;
+
+        /**
+         * Ctor.
+         * @param gridName Name.
+         */
+        private PlatformStopIgniteJob(String gridName) {
+            this.gridName = gridName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            return Ignition.stop(gridName, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 4ba05e1..e72fb85 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -102,6 +102,7 @@
     <Compile Include="Compute\FailoverTaskSelfTest.cs" />
     <Compile Include="Compute\BinarizableClosureTaskTest.cs" />
     <Compile Include="Compute\BinarizableTaskTest.cs" />
+    <Compile Include="Compute\MixedClusterTest.cs" />
     <Compile Include="Compute\ResourceTaskTest.cs" />
     <Compile Include="Compute\SerializableClosureTaskTest.cs" />
     <Compile Include="Compute\TaskAdapterTest.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
new file mode 100644
index 0000000..57ea892
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/MixedClusterTest.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma warning disable 618  // SpringConfigUrl
+namespace Apache.Ignite.Core.Tests.Compute
+{
+    using System;
+    using System.Collections;
+    using System.Linq;
+    using Apache.Ignite.Core.Compute;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests compute in a cluster with Java-only and .NET nodes.
+    /// </summary>
+    public class MixedClusterTest
+    {
+        /** */
+        private const string SpringConfig = @"Config\Compute\compute-grid1.xml";
+
+        /** */
+        private const string SpringConfig2 = @"Config\Compute\compute-grid2.xml";
+
+        /** */
+        private const string StartTask = "org.apache.ignite.platform.PlatformStartIgniteTask";
+
+        /** */
+        private const string StopTask = "org.apache.ignite.platform.PlatformStopIgniteTask";
+
+        /// <summary>
+        /// Tests the compute.
+        /// </summary>
+        [Test]
+        public void TestCompute()
+        {
+            var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) {SpringConfigUrl = SpringConfig};
+
+            using (var ignite = Ignition.Start(cfg))
+            {
+                var javaNodeName = StartJavaNode(ignite, SpringConfig2);
+
+                try
+                {
+                    Assert.IsTrue(ignite.WaitTopology(2));
+
+                    TestDotNetTask(ignite);
+                    TestJavaTask(ignite);
+                }
+                finally
+                {
+                    StopJavaNode(ignite, javaNodeName);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests the dot net task.
+        /// </summary>
+        /// <param name="ignite">The ignite.</param>
+        private static void TestDotNetTask(IIgnite ignite)
+        {
+            var results = ignite.GetCompute().Broadcast(new ComputeFunc());
+
+            // There are two nodes, but only one can execute .NET jobs.
+            Assert.AreEqual(new[] {int.MaxValue}, results.ToArray());
+        }
+
+        /// <summary>
+        /// Tests the dot net task.
+        /// </summary>
+        /// <param name="ignite">The ignite.</param>
+        private static void TestJavaTask(IIgnite ignite)
+        {
+            // Java task can execute on both nodes.
+            var res = ignite.GetCompute().ExecuteJavaTask<ICollection>(ComputeApiTest.BroadcastTask, null);
+
+            Assert.AreEqual(2, res.Count);
+        }
+
+        /// <summary>
+        /// Starts the java node.
+        /// </summary>
+        private static string StartJavaNode(IIgnite grid, string config)
+        {
+            return grid.GetCompute().ExecuteJavaTask<string>(StartTask, config);
+        }
+
+        /// <summary>
+        /// Stops the java node.
+        /// </summary>
+        private static void StopJavaNode(IIgnite grid, string name)
+        {
+            grid.GetCompute().ExecuteJavaTask<object>(StopTask, name);
+        }
+
+        /// <summary>
+        /// Test func.
+        /// </summary>
+        [Serializable]
+        private class ComputeFunc : IComputeFunc<int>
+        {
+            /** <inheritdoc /> */
+            public int Invoke()
+            {
+                return int.MaxValue;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
index 3061773..78a30a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid1.xml
@@ -69,7 +69,6 @@
                         </bean>
                     </list>
                 </property>
-
             </bean>
         </property>
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
index ef29a89..b1e8235 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Compute/compute-grid2.xml
@@ -62,10 +62,23 @@
                         </bean>
                     </list>
                 </property>
-
+                <property name="idMapper">
+                    <bean class="org.apache.ignite.binary.BinaryBasicIdMapper">
+                        <constructor-arg value="true"/>
+                    </bean>
+                </property>
+                <property name="nameMapper">
+                    <bean class="org.apache.ignite.binary.BinaryBasicNameMapper">
+                        <constructor-arg value="true"/>
+                    </bean>
+                </property>
             </bean>
         </property>
 
+        <property name="marshaller">
+            <bean class="org.apache.ignite.internal.binary.BinaryMarshaller" />
+        </property>
+
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index 15f5804..3aa26d8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Tests
     using System.Linq;
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Discovery;
     using Apache.Ignite.Core.Discovery.Tcp;
     using Apache.Ignite.Core.Discovery.Tcp.Multicast;
     using Apache.Ignite.Core.Discovery.Tcp.Static;
@@ -138,12 +137,12 @@ namespace Apache.Ignite.Core.Tests
             using (var ignite = Ignition.Start(new IgniteConfiguration
             {
                 Localhost = "127.0.0.1",
-                DiscoverySpi = GetStaticDiscovery()
+                DiscoverySpi = TestUtils.GetStaticDiscovery()
             }))
             using (var ignite2 = Ignition.Start(new IgniteConfiguration
             {
                 Localhost = "127.0.0.1",
-                DiscoverySpi = GetStaticDiscovery(),
+                DiscoverySpi = TestUtils.GetStaticDiscovery(),
                 GridName = "client",
                 ClientMode = true
             }))
@@ -351,17 +350,5 @@ namespace Apache.Ignite.Core.Tests
                 Localhost = "127.0.0.1"
             };
         }
-
-        /// <summary>
-        /// Gets the static discovery.
-        /// </summary>
-        /// <returns></returns>
-        private static IDiscoverySpi GetStaticDiscovery()
-        {
-            return new TcpDiscoverySpi
-            {
-                IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new[] {"127.0.0.1:47500", "127.0.0.1:47501"}}
-            };
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d38eb8f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
index f972cf7..9fe8f1c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
@@ -22,6 +22,9 @@ namespace Apache.Ignite.Core.Tests
     using System.Collections.Generic;
     using System.Linq;
     using System.Threading;
+    using Apache.Ignite.Core.Discovery;
+    using Apache.Ignite.Core.Discovery.Tcp;
+    using Apache.Ignite.Core.Discovery.Tcp.Static;
     using Apache.Ignite.Core.Impl;
     using Apache.Ignite.Core.Impl.Common;
     using Apache.Ignite.Core.Tests.Process;
@@ -212,7 +215,7 @@ namespace Apache.Ignite.Core.Tests
         /// <returns>
         ///   <c>True</c> if topology took required size.
         /// </returns>
-        public static bool WaitTopology(this IIgnite grid, int size, int timeout)
+        public static bool WaitTopology(this IIgnite grid, int size, int timeout = 30000)
         {
             int left = timeout;
 
@@ -302,5 +305,33 @@ namespace Apache.Ignite.Core.Tests
 
             return false;
         }
+
+        /// <summary>
+        /// Gets the static discovery.
+        /// </summary>
+        public static IDiscoverySpi GetStaticDiscovery()
+        {
+            return new TcpDiscoverySpi
+            {
+                IpFinder = new TcpDiscoveryStaticIpFinder
+                {
+                    Endpoints = new[] { "127.0.0.1:47500", "127.0.0.1:47501" }
+                }
+            };
+        }
+
+        /// <summary>
+        /// Gets the default code-based test configuration.
+        /// </summary>
+        public static IgniteConfiguration GetTestConfiguration()
+        {
+            return new IgniteConfiguration
+            {
+                DiscoverySpi = GetStaticDiscovery(),
+                Localhost = "127.0.0.1",
+                JvmOptions = TestJavaOptions(),
+                JvmClasspath = CreateTestClasspath()
+            };
+        }
     }
 }


[04/10] ignite git commit: IGFS: Minor refactoring.

Posted by sb...@apache.org.
IGFS: Minor refactoring.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0e85fda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0e85fda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0e85fda

Branch: refs/heads/ignite-1232
Commit: b0e85fdab48738b5db84554b52ae7978aa87ed95
Parents: cba4f4c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 12:56:55 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 12:56:55 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 55 +++++++++++---------
 1 file changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0e85fda/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index a149b31..0ba78c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -480,7 +480,7 @@ public class IgfsMetaManager extends IgfsManager {
                 assert validTxState(false);
                 assert fileId != null;
 
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     // Lock file ID for this transaction.
@@ -494,7 +494,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                     IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock);
 
-                    boolean put = metaCache.replace(fileId, oldInfo, newInfo);
+                    boolean put = id2InfoPrj.replace(fileId, oldInfo, newInfo);
 
                     assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
 
@@ -571,7 +571,7 @@ public class IgfsMetaManager extends IgfsManager {
                 final boolean interrupted = Thread.interrupted();
 
                 try {
-                    IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
+                    IgfsUtils.doInTransactionWithRetries(id2InfoPrj, new IgniteOutClosureX<Void>() {
                         @Override public Void applyx() throws IgniteCheckedException {
                             assert validTxState(true);
 
@@ -591,7 +591,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                             IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
 
-                            boolean put = metaCache.put(fileId, newInfo);
+                            boolean put = id2InfoPrj.put(fileId, newInfo);
 
                             assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo
                                     + ']';
@@ -847,8 +847,6 @@ public class IgfsMetaManager extends IgfsManager {
         if (!id2InfoPrj.putIfAbsent(fileId, newFileInfo))
             throw fsException("Failed to add file details into cache: " + newFileInfo);
 
-        assert metaCache.get(parentId) != null;
-
         id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new IgfsListingEntry(newFileInfo), false));
 
         return null;
@@ -890,7 +888,7 @@ public class IgfsMetaManager extends IgfsManager {
                 }
 
                 // 2. Start transaction.
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     // 3. Obtain the locks.
@@ -1094,9 +1092,6 @@ public class IgfsMetaManager extends IgfsManager {
                 " directory (file already exists) [fileId=" + fileId + ", destFileName=" + destFileName +
                 ", destParentId=" + destParentId + ", destEntry=" + destEntry + ']'));
 
-        assert metaCache.get(srcParentId) != null;
-        assert metaCache.get(destParentId) != null;
-
         // Remove listing entry from the source parent listing.
         id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
 
@@ -1116,7 +1111,7 @@ public class IgfsMetaManager extends IgfsManager {
             try {
                 assert validTxState(false);
 
-                final IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                final IgniteInternalTx tx = startTx();
 
                 try {
                     // NB: We may lock root because its id is less than any other id:
@@ -1197,7 +1192,7 @@ public class IgfsMetaManager extends IgfsManager {
                 boolean added = allIds.add(TRASH_ID);
                 assert added;
 
-                final IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                final IgniteInternalTx tx = startTx();
 
                 try {
                     final Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(allIds);
@@ -1360,7 +1355,7 @@ public class IgfsMetaManager extends IgfsManager {
                 assert listing != null;
                 assert validTxState(false);
 
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     Collection<IgniteUuid> res = new HashSet<>();
@@ -1449,7 +1444,7 @@ public class IgfsMetaManager extends IgfsManager {
             try {
                 assert validTxState(false);
 
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     boolean res = false;
@@ -1630,7 +1625,7 @@ public class IgfsMetaManager extends IgfsManager {
             try {
                 assert validTxState(false);
 
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     IgfsFileInfo info = updatePropertiesNonTx(parentId, fileId, fileName, props);
@@ -1700,8 +1695,7 @@ public class IgfsMetaManager extends IgfsManager {
                 if (log.isDebugEnabled())
                     log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']');
 
-                IgniteInternalTx tx = metaCache.isLockedByThread(fileId) ? null : metaCache.txStartEx(PESSIMISTIC,
-                    REPEATABLE_READ);
+                IgniteInternalTx tx = id2InfoPrj.isLockedByThread(fileId) ? null : startTx();
 
                 try {
                     // Lock file ID for this transaction.
@@ -1724,7 +1718,7 @@ public class IgfsMetaManager extends IgfsManager {
                         throw fsException("Failed to update file info (file types differ)" +
                             " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
 
-                    boolean b = metaCache.replace(fileId, oldInfo, newInfo);
+                    boolean b = id2InfoPrj.replace(fileId, oldInfo, newInfo);
 
                     assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo +
                         ", c=" + c + ']';
@@ -1771,7 +1765,7 @@ public class IgfsMetaManager extends IgfsManager {
                     b = new DirectoryChainBuilder(path, props, props);
 
                     // Start TX.
-                    IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                    IgniteInternalTx tx = startTx();
 
                     try {
                         final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
@@ -1856,7 +1850,7 @@ public class IgfsMetaManager extends IgfsManager {
             try {
                 validTxState(false);
 
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
@@ -2120,7 +2114,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                             assert lockedInfo != null; // We checked the lock above.
 
-                            boolean put = metaCache.put(info.id(), lockedInfo);
+                            boolean put = id2InfoPrj.put(info.id(), lockedInfo);
 
                             assert put;
 
@@ -2707,7 +2701,7 @@ public class IgfsMetaManager extends IgfsManager {
                 pathIds.add(fileIds(path));
 
             // Start pessimistic.
-            IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+            IgniteInternalTx tx = startTx();
 
             try {
                 // Lock the very first existing parents and possibly the leaf as well.
@@ -2909,16 +2903,25 @@ public class IgfsMetaManager extends IgfsManager {
      * @return Transaction state is correct.
      */
     private boolean validTxState(boolean inTx) {
-        boolean txState = inTx == (metaCache.tx() != null);
+        boolean txState = inTx == (id2InfoPrj.tx() != null);
 
         assert txState : (inTx ? "Method cannot be called outside transaction " :
-            "Method cannot be called in transaction ") + "[tx=" + metaCache.tx() + ", threadId=" +
+            "Method cannot be called in transaction ") + "[tx=" + id2InfoPrj.tx() + ", threadId=" +
             Thread.currentThread().getId() + ']';
 
         return txState;
     }
 
     /**
+     * Start transaction on meta cache.
+     *
+     * @return Transaction.
+     */
+    private IgniteInternalTx startTx() {
+        return metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
      * Updates last access and last modification times.
      *
      * @param parentId File parent ID.
@@ -2935,7 +2938,7 @@ public class IgfsMetaManager extends IgfsManager {
                 assert validTxState(false);
 
                 // Start pessimistic transaction.
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                IgniteInternalTx tx = startTx();
 
                 try {
                     Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(fileId, parentId);
@@ -3401,7 +3404,7 @@ public class IgfsMetaManager extends IgfsManager {
                     };
 
                     // Start Tx:
-                    IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                    IgniteInternalTx tx = startTx();
 
                     try {
                         if (overwrite)


[07/10] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 62ed66f..cdf4ffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -28,11 +28,14 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
@@ -40,7 +43,9 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -55,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -132,6 +138,51 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testFilterAndFactoryProvided() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        try {
+            final ContinuousQuery qry = new ContinuousQuery();
+
+            qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() {
+                @Override public CacheEntryEventFilter create() {
+                    return null;
+                }
+            });
+
+            qry.setRemoteFilter(new CacheEntryEventSerializableFilter() {
+                @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+                    return false;
+                }
+            });
+
+            qry.setLocalListener(new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    // No-op.
+                }
+            });
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return cache.query(qry);
+                }
+            }, IgniteException.class, null);
+
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -576,7 +627,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param deploy The place where continuous query will be started.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 
@@ -1124,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param store If {@code true} configures dummy cache store.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
         CacheAtomicityMode atomicityMode,
@@ -1176,7 +1227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestKey implements Serializable, Comparable {
+    public static class QueryTestKey implements Serializable, Comparable {
         /** */
         private final Integer key;
 
@@ -1219,12 +1270,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestValue implements Serializable {
+    public static class QueryTestValue implements Serializable {
         /** */
-        private final Integer val1;
+        protected final Integer val1;
 
         /** */
-        private final String val2;
+        protected final String val2;
 
         /**
          * @param val Value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
new file mode 100644
index 0000000..359dd58
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Event filter for deployment.
+ */
+public class CacheDeploymentEntryEventFilter implements CacheEntryEventFilter<Integer, Integer> {
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+        throws CacheEntryListenerException {
+        return evt.getValue() == null || evt.getValue() % 2 != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
new file mode 100644
index 0000000..0d6eceb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Event filter factory for deployment.
+ */
+public class CacheDeploymentEntryEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> {
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<Integer, Integer> create() {
+        return new CacheDeploymentEntryEventFilter();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 968dbf6..083af1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -79,12 +79,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -225,6 +227,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+        suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+        suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
 


[08/10] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".

Posted by sb...@apache.org.
Fixed  IGNITE-1186 "Filter is sent instead of factory when continuous query is created".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baa13122
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baa13122
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baa13122

Branch: refs/heads/ignite-1232
Commit: baa131220bf503da0908e4ecfee92966317e209c
Parents: c13339f
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 3 16:21:53 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 3 16:21:53 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     |  35 +
 .../processors/cache/IgniteCacheProxy.java      |   4 +
 .../continuous/CacheContinuousQueryHandler.java |  86 ++-
 .../CacheContinuousQueryHandlerV2.java          | 176 +++++
 .../continuous/CacheContinuousQueryManager.java | 238 +++++--
 .../continuous/GridContinuousProcessor.java     |   7 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  75 +-
 .../cache/IgniteCacheEntryListenerTxTest.java   |   5 -
 .../GridCacheReplicatedPreloadSelfTest.java     |  39 +-
 .../CacheContinuousQueryFactoryFilterTest.java  | 714 +++++++++++++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../CacheContinuousQueryOperationP2PTest.java   | 326 +++++++++
 ...acheContinuousQueryRandomOperationsTest.java |  63 +-
 .../p2p/CacheDeploymentEntryEventFilter.java    |  33 +
 .../CacheDeploymentEntryEventFilterFactory.java |  31 +
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +
 16 files changed, 1706 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index df1bad3..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.cache.query;
 
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -119,6 +121,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** Remote filter. */
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
     /** Time interval. */
     private long timeInterval = DFLT_TIME_INTERVAL;
 
@@ -196,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @param rmtFilter Key-value filter.
      * @return {@code this} for chaining.
+     *
+     * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
      */
+    @Deprecated
     public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
         this.rmtFilter = rmtFilter;
 
@@ -213,6 +221,33 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     }
 
     /**
+     * Sets optional key-value filter factory. This factory produces filter is called before entry is
+     * sent to the master node.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+     * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     *
+     * @param rmtFilterFactory Key-value filter factory.
+     * @return {@code this} for chaining.
+     */
+    public ContinuousQuery<K, V> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
+     */
+    public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
+        return rmtFilterFactory;
+    }
+
+    /**
      * Sets time interval.
      * <p>
      * When a cache update happens, entry is first put into a buffer. Entries from buffer will

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5ed8753..6e8bcbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -565,10 +565,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (qry.getLocalListener() == null)
             throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
 
+        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+            throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
         try {
             final UUID routineId = ctx.continuousQueries().executeQuery(
                 qry.getLocalListener(),
                 qry.getRemoteFilter(),
+                qry.getRemoteFilterFactory(),
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1938edb..10fbd89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
@@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param topic Topic for ordered messages.
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
-     * @param internal Internal flag.
-     * @param notifyExisting Notify existing flag.
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
-     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
-     * @param taskHash Task name hash code.
-     * @param locCache {@code True} if local cache.
-     * @param keepBinary Keep binary flag.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
         CacheEntryEventSerializableFilter<K, V> rmtFilter,
-        boolean internal,
-        boolean notifyExisting,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        int taskHash,
-        boolean skipPrimaryCheck,
-        boolean locCache,
-        boolean keepBinary,
         boolean ignoreClsNotFound) {
         assert topic != null;
         assert locLsnr != null;
@@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         this.topic = topic;
         this.locLsnr = locLsnr;
         this.rmtFilter = rmtFilter;
-        this.internal = internal;
-        this.notifyExisting = notifyExisting;
         this.oldValRequired = oldValRequired;
         this.sync = sync;
         this.ignoreExpired = ignoreExpired;
-        this.taskHash = taskHash;
-        this.skipPrimaryCheck = skipPrimaryCheck;
-        this.locCache = locCache;
-        this.keepBinary = keepBinary;
         this.ignoreClsNotFound = ignoreClsNotFound;
 
         cacheId = CU.cacheId(cacheName);
     }
 
+    /**
+     * @param internal Internal query.
+     */
+    public void internal(boolean internal) {
+        this.internal = internal;
+    }
+
+    /**
+     * @param notifyExisting Notify existing.
+     */
+    public void notifyExisting(boolean notifyExisting) {
+        this.notifyExisting = notifyExisting;
+    }
+
+    /**
+     * @param locCache Local cache.
+     */
+    public void localCache(boolean locCache) {
+        this.locCache = locCache;
+    }
+
+    /**
+     * @param taskHash Task hash.
+     */
+    public void taskNameHash(int taskHash) {
+        this.taskHash = taskHash;
+    }
+
+    /**
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+     */
+    public void skipPrimaryCheck(boolean skipPrimaryCheck) {
+        this.skipPrimaryCheck = skipPrimaryCheck;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;
@@ -262,8 +279,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (locLsnr != null)
             ctx.resource().injectGeneric(locLsnr);
 
-        if (rmtFilter != null)
-            ctx.resource().injectGeneric(rmtFilter);
+        final CacheEntryEventFilter filter = getEventFilter();
+
+        if (filter != null)
+            ctx.resource().injectGeneric(filter);
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -303,7 +322,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName()
@@ -332,9 +352,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                 boolean notify = !evt.entry().isFiltered();
 
-                if (notify && rmtFilter != null) {
+                if (notify && filter != null) {
                     try {
-                        notify = rmtFilter.evaluate(evt);
+                        notify = filter.evaluate(evt);
                     }
                     catch (Exception e) {
                         U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +442,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        rmtFilter,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName(),
@@ -435,8 +456,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (rmtFilter instanceof PlatformContinuousQueryFilter)
-                    ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+                if (filter instanceof PlatformContinuousQueryFilter)
+                    ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +538,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @return Cache entry event filter.
+     */
+    public CacheEntryEventFilter getEventFilter() {
+        return rmtFilter;
+    }
+
+    /**
      * @param cctx Context.
      * @param nodeId ID of the node that started routine.
      * @param entry Entry.
@@ -1189,7 +1217,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * Deployable object.
      */
-    private static class DeployableObject implements Externalizable {
+    protected static class DeployableObject implements Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1214,7 +1242,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param ctx Kernal context.
          * @throws IgniteCheckedException In case of error.
          */
-        private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+        protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
             assert obj != null;
             assert ctx != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
new file mode 100644
index 0000000..7aef4dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V2 version. Contains {@link Factory} for remote listener.
+ */
+public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+    /** Deployable object for filter factory. */
+    private DeployableObject rmtFilterFactoryDep;
+
+    /** Event types for JCache API. */
+    private byte types;
+
+    /** */
+    protected transient CacheEntryEventFilter filter;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryHandlerV2() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheName Cache name.
+     * @param topic Topic for ordered messages.
+     * @param locLsnr Local listener.
+     * @param rmtFilterFactory Remote filter factory.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired events flag.
+     * @param types Event types.
+     */
+    public CacheContinuousQueryHandlerV2(
+        String cacheName,
+        Object topic,
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound,
+        @Nullable Byte types) {
+        super(cacheName,
+            topic,
+            locLsnr,
+            null,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound);
+
+        assert rmtFilterFactory != null;
+
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        if (types != null) {
+            assert types != 0;
+
+            this.types = types;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter getEventFilter() {
+        if (filter == null) {
+            assert rmtFilterFactory != null;
+
+            Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
+
+            filter = factory.create();
+
+            if (types != 0)
+                filter = new JCacheQueryRemoteFilter(filter, types);
+        }
+
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pMarshal(ctx);
+
+        if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
+            rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pUnmarshal(nodeId, ctx);
+
+        if (rmtFilterFactoryDep != null)
+            rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridContinuousHandler clone() {
+        return super.clone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryHandlerV2.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        boolean b = rmtFilterFactoryDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(rmtFilterFactoryDep);
+        else
+            out.writeObject(rmtFilterFactory);
+
+        out.writeByte(types);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            rmtFilterFactoryDep = (DeployableObject)in.readObject();
+        else
+            rmtFilterFactory = (Factory)in.readObject();
+
+        types = in.readByte();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 409c1da..353043f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,15 +23,16 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
@@ -54,10 +55,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
 
 /**
  * Continuous queries manager.
@@ -413,28 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
+    public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+        @Nullable final CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean loc,
-        boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+
+        if (rmtFilterFactory != null)
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                    CacheContinuousQueryHandler hnd;
+
+                    if (v2)
+                        hnd = new CacheContinuousQueryHandlerV2(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            rmtFilterFactory,
+                            true,
+                            false,
+                            true,
+                            false,
+                            null);
+                    else {
+                        CacheEntryEventFilter fltr = rmtFilterFactory.create();
+
+                        if (!(fltr instanceof CacheEntryEventSerializableFilter))
+                            throw new IgniteException("Topology has nodes of the old versions. In this case " +
+                                "EntryEventFilter should implement " +
+                                "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
+
+                        hnd = new CacheContinuousQueryHandler(
+                            cctx.name(),
+                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                            locLsnr,
+                            (CacheEntryEventSerializableFilter)fltr,
+                            true,
+                            false,
+                            true,
+                            false);
+                    }
+
+                    return hnd;
+                }
+            };
+        else
+            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        false);
+                }
+            };
+
         return executeQuery0(
             locLsnr,
-            rmtFilter,
+            clsr,
             bufSize,
             timeInterval,
             autoUnsubscribe,
             false,
             false,
-            true,
-            false,
-            true,
             loc,
-            keepBinary,
-            false);
+            keepBinary);
     }
 
     /**
@@ -445,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
-        boolean loc,
-        boolean notifyExisting,
-        boolean ignoreClassNotFound)
+    public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr,
+        final CacheEntryEventSerializableFilter rmtFilter,
+        final boolean loc,
+        final boolean notifyExisting,
+        final boolean ignoreClassNotFound)
         throws IgniteCheckedException
     {
         return executeQuery0(
             locLsnr,
-            rmtFilter,
+            new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+                    return new CacheContinuousQueryHandler(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilter,
+                        true,
+                        false,
+                        true,
+                        ignoreClassNotFound);
+                }
+            },
             ContinuousQuery.DFLT_PAGE_SIZE,
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
             true,
             notifyExisting,
-            true,
-            false,
-            true,
             loc,
-            false,
-            ignoreClassNotFound);
+            false);
     }
 
     /**
@@ -539,32 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
     /**
      * @param locLsnr Local listener.
-     * @param rmtFilter Remote filter.
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired event flag.
      * @param loc Local flag.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
     private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
-        final CacheEntryEventSerializableFilter rmtFilter,
+        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean internal,
         boolean notifyExisting,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
         boolean loc,
-        final boolean keepBinary,
-        boolean ignoreClassNotFound) throws IgniteCheckedException
+        final boolean keepBinary) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -573,21 +628,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
 
-        GridContinuousHandler hnd = new CacheContinuousQueryHandler(
-            cctx.name(),
-            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-            locLsnr,
-            rmtFilter,
-            internal,
-            notifyExisting,
-            oldValRequired,
-            sync,
-            ignoreExpired,
-            taskNameHash,
-            skipPrimaryCheck,
-            cctx.isLocal(),
-            keepBinary,
-            ignoreClassNotFound);
+        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+        final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+
+        hnd.taskNameHash(taskNameHash);
+        hnd.skipPrimaryCheck(skipPrimaryCheck);
+        hnd.notifyExisting(notifyExisting);
+        hnd.internal(internal);
+        hnd.keepBinary(keepBinary);
+        hnd.localCache(cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -654,7 +704,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     cctx.kernalContext().cache().jcache(cctx.name()),
                                     cctx, entry);
 
-                                if (rmtFilter != null && !rmtFilter.evaluate(next))
+                                if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
                                     next = null;
                             }
                         }
@@ -667,6 +717,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param nodes Nodes.
+     * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
+     *     otherwise {@code false}.
+     */
+    private boolean useV2Protocol(Collection<ClusterNode> nodes) {
+        for (ClusterNode node : nodes) {
+            if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param lsnrId Listener ID.
      * @param lsnr Listener.
      * @param internal Internal flag.
@@ -767,36 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             if (types == 0)
                 throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces.");
 
-            CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
+            final byte types0 = types;
+
+            final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
                 locLsnrImpl,
                 log);
 
-            CacheEntryEventFilter fltr = null;
-
-            if (cfg.getCacheEntryEventFilterFactory() != null) {
-                fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
-                if (!(fltr instanceof Serializable))
-                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
-                        + fltr);
-            }
-
-            CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
             routineId = executeQuery0(
                 locLsnr,
-                rmtFilter,
+                new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+                    @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                        CacheContinuousQueryHandler hnd;
+                        Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+
+                        v2 = rmtFilterFactory != null && v2;
+
+                        if (v2)
+                            hnd = new CacheContinuousQueryHandlerV2(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                rmtFilterFactory,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false,
+                                types0);
+                        else {
+                            JCacheQueryRemoteFilter jCacheFilter;
+
+                            CacheEntryEventFilter filter = null;
+
+                            if (rmtFilterFactory != null) {
+                                filter = rmtFilterFactory.create();
+
+                                if (!(filter instanceof Serializable))
+                                    throw new IgniteException("Topology has nodes of the old versions. " +
+                                        "In this case EntryEventFilter must implement java.io.Serializable " +
+                                        "interface. Filter: " + filter);
+                            }
+
+                            jCacheFilter = new JCacheQueryRemoteFilter(filter, types0);
+
+                            hnd = new CacheContinuousQueryHandler(
+                                cctx.name(),
+                                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                                locLsnr,
+                                jCacheFilter,
+                                cfg.isOldValueRequired(),
+                                cfg.isSynchronous(),
+                                false,
+                                false);
+                        }
+
+                        return hnd;
+                    }
+                },
                 ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
                 false,
                 false,
-                cfg.isOldValueRequired(),
-                cfg.isSynchronous(),
                 false,
-                false,
-                keepBinary,
-                false);
+                keepBinary
+            );
         }
 
         /**
@@ -814,6 +912,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
      */
     private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
         /** */
@@ -896,8 +995,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * For handler version 2.0 this filter should not be serialized.
      */
-    private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
+    protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -922,7 +1022,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
          * @param impl Filter.
          * @param types Types.
          */
-        JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) {
+        JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) {
             assert types != 0;
 
             this.impl = impl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1ec69c2..1776748 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -110,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Threads started by this processor. */
     private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>();
 
+    /**  */
+    public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9");
+
     /** */
     private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
 
@@ -615,7 +619,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         startFuts.put(routineId, fut);
 
         try {
-            if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
+            if (locIncluded
+                && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
                 hnd.onListenerRegistered(routineId, ctx);
 
             ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..35fbbd5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListener;
 import javax.cache.event.CacheEntryListenerException;
@@ -99,6 +100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /** */
     private boolean useObjects;
 
+    /** */
+    private static AtomicBoolean serialized = new AtomicBoolean(false);
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +142,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             assertEquals(0, syncMsgFuts.size());
         }
+
+        serialized.set(false);
     }
 
     /**
@@ -178,11 +184,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
                     return new CreateUpdateRemoveExpireListener();
                 }
             },
-            new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
-                @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
-                    return new ExceptionFilter();
-                }
-            },
+            new ExceptionFilterFactory(),
             false,
             false
         );
@@ -443,18 +445,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
             FactoryBuilder.factoryOf(lsnr),
-            null,
+            new SerializableFactory(),
             true,
             false
         ));
 
         try {
             startGrid(gridCount());
+
+            jcache(0).put(1, 1);
         }
         finally {
             stopGrid(gridCount());
         }
 
+        jcache(0).put(2, 2);
+
+        assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
         assertFalse(serialized.get());
     }
 
@@ -1130,9 +1137,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+        @Override public CacheEntryEventFilter<Object, Object> create() {
             return new TestFilter();
         }
     }
@@ -1184,7 +1191,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+    private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
@@ -1201,6 +1208,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             return key % 2 == 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Filter must not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Filter must not be unmarshaled.");
+        }
     }
 
     /**
@@ -1355,6 +1372,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     *
+     */
+    public static class SerializableFactory implements Factory<NonSerializableFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+            return true;
+        }
+    }
+
+    /**
      */
     public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
         /** */
@@ -1467,4 +1514,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
             return S.toString(ListenerTestValue.class, this);
         }
     }
+
+    /**
+     *
+     */
+    static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+            return new ExceptionFilter();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
     @Override protected NearCacheConfiguration nearConfiguration() {
         return null;
     }
-
-    /** {@inheritDoc} */
-    @Override public void testEvents(){
-        fail("https://issues.apache.org/jira/browse/IGNITE-1600");
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index ea2f27b..c6cd5af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -403,18 +404,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
                         }
                     }
                 },
-                new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
-                    /** {@inheritDoc} */
-                    @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
-                        try {
-
-                            return cls2.newInstance();
-                        }
-                        catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                },
+                new ClassFilterFactory(cls2),
                 true,
                 true
             );
@@ -946,4 +936,29 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             return true;
         }
     }
+
+    /**
+     *
+     */
+    private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, Object>> {
+        /** */
+        private Class<CacheEntryEventSerializableFilter> cls;
+
+        /**
+         * @param cls Class.
+         */
+        public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+            this.cls = cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+            try {
+                return cls.newInstance();
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
new file mode 100644
index 0000000..6143fa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 40;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInternalQuery() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        UUID uuid = null;
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(5);
+
+            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    for (Object evt : iterable) {
+                        latch.countDown();
+
+                        log.info("Received event: " + evt);
+                    }
+                }
+            };
+
+            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+            for (int i = 10; i < 20; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, SECONDS));
+        }
+        finally {
+            if (uuid != null)
+                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                    .cancelInternalQuery(uuid);
+
+            cache.destroy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+            if (deploy == CLIENT)
+                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+            else if (deploy == SERVER)
+                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+                    rnd.nextBoolean()));
+            else {
+                boolean isSync = rnd.nextBoolean();
+
+                for (int i = 0; i < NODES - 1; i++)
+                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+            }
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 10 == 0)
+                        log.info("Iteration: " + i);
+
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+                }
+            }
+            finally {
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
+
+                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param nodeIdx Node index.
+     * @param curs Cursors.
+     * @param lsnrCfgs Listener configurations.
+     * @return Event queue
+     */
+    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+        int nodeIdx,
+        Collection<QueryCursor<?>> curs,
+        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+        boolean sync) {
+        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+                new MutableCacheEntryListenerConfiguration<>(
+                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    }),
+                    new FilterFactory(),
+                    true,
+                    sync
+                );
+
+            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+        }
+        else {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<?, ?> evt : evts)
+                        evtsQueue.add(evt);
+                }
+            });
+
+            qry.setRemoteFilterFactory(new FilterFactory());
+
+            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+            curs.add(cur);
+        }
+
+        return evtsQueue;
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueues Events queue.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(evtsQueues);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        } finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
+        if ((val == null && oldVal == null
+            || (val != null && !isAccepted((QueryTestValue)val)))) {
+            checkNoEvent(evtsQueues);
+
+            return;
+        }
+
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertNull(evt);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class NonSerializableFilter
+        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Integer val) {
+            return val == null || val % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class FilterFactory implements Factory<NonSerializableFilter> {
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+        Externalizable {
+        /** */
+        public LocalNonSerialiseListener() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
 
             if (hnd.isQuery() && hnd.cacheName() == null) {
-                backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+                backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..97f9e0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+        cfg.setPeerClassLoadingEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param isClient Client.
+     * @throws Exception If failed.
+     */
+    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        QueryCursor<?> cur = null;
+
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        TestLocalListener localLsnr = new TestLocalListener() {
+            @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+                throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+                    latch.countDown();
+
+                    log.info("Received event: " + evt);
+                }
+            }
+        };
+
+        MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+            new MutableCacheEntryListenerConfiguration<>(
+                new FactoryBuilder.SingletonFactory<>(localLsnr),
+                (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+                    (Object)evtFilterFactory.newInstance(),
+                true,
+                true
+            );
+
+        qry.setLocalListener(localLsnr);
+
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+        IgniteCache<Integer, Integer> cache = null;
+
+        try {
+            if (isClient)
+                cache = grid(NODES - 1).cache(ccfg.getName());
+            else
+                cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+            cur = cache.query(qry);
+
+            cache.registerCacheEntryListener(lsnrCfg);
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+        }
+        finally {
+            if (cur != null)
+                cur.close();
+
+            if (cache != null)
+                cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>,
+        CacheEntryCreatedListener<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+            throws CacheEntryListenerException {
+            onEvent(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+            throws CacheEntryListenerException {
+            onEvent(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts);
+    }
+}


[06/10] ignite git commit: IGNITE-2754: IGFS: Created separate processor for listing remove operation.

Posted by sb...@apache.org.
IGNITE-2754: IGFS: Created separate processor for listing remove operation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c13339fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c13339fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c13339fe

Branch: refs/heads/ignite-1232
Commit: c13339fef50b34157d0b9d74ee42c24d89af79b2
Parents: bcb8b52
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 13:25:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 13:25:43 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 160 ++++++++++++-------
 1 file changed, 105 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c13339fe/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 0ba78c5..c120b9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -43,6 +43,7 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -847,7 +848,7 @@ public class IgfsMetaManager extends IgfsManager {
         if (!id2InfoPrj.putIfAbsent(fileId, newFileInfo))
             throw fsException("Failed to add file details into cache: " + newFileInfo);
 
-        id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new IgfsListingEntry(newFileInfo), false));
+        id2InfoPrj.invoke(parentId, new ListingAdd(fileName, new IgfsListingEntry(newFileInfo)));
 
         return null;
     }
@@ -956,8 +957,8 @@ public class IgfsMetaManager extends IgfsManager {
                     // 8. Actual move: remove from source parent and add to destination target.
                     IgfsListingEntry entry = srcTargetInfo.listing().get(srcName);
 
-                    id2InfoPrj.invoke(srcTargetId, new UpdateListing(srcName, entry, true));
-                    id2InfoPrj.invoke(dstTargetId, new UpdateListing(dstName, entry, false));
+                    id2InfoPrj.invoke(srcTargetId, new ListingRemove(srcName, entry.fileId()));
+                    id2InfoPrj.invoke(dstTargetId, new ListingAdd(dstName, entry));
 
                     tx.commit();
 
@@ -1093,10 +1094,10 @@ public class IgfsMetaManager extends IgfsManager {
                 ", destParentId=" + destParentId + ", destEntry=" + destEntry + ']'));
 
         // Remove listing entry from the source parent listing.
-        id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+        id2InfoPrj.invoke(srcParentId, new ListingRemove(srcFileName, srcEntry.fileId()));
 
         // Add listing entry into the destination parent listing.
-        id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false));
+        id2InfoPrj.invoke(destParentId, new ListingAdd(destFileName, srcEntry));
     }
 
     /**
@@ -1134,8 +1135,8 @@ public class IgfsMetaManager extends IgfsManager {
                     id2InfoPrj.put(newInfo.id(), newInfo);
 
                     // Add new info to trash listing.
-                    id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
-                        new IgfsListingEntry(newInfo), false));
+                    id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(),
+                        new IgfsListingEntry(newInfo)));
 
                     // Remove listing entries from root.
                     // Note that root directory properties and other attributes are preserved:
@@ -1233,10 +1234,10 @@ public class IgfsMetaManager extends IgfsManager {
 
                     assert victimId.equals(srcEntry.fileId());
 
-                    id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+                    id2InfoPrj.invoke(srcParentId, new ListingRemove(srcFileName, srcEntry.fileId()));
 
                     // Add listing entry into the destination parent listing.
-                    id2InfoPrj.invoke(TRASH_ID, new UpdateListing(destFileName, srcEntry, false));
+                    id2InfoPrj.invoke(TRASH_ID, new ListingAdd(destFileName, srcEntry));
 
                     if (victimInfo.isFile())
                         // Update a file info of the removed file with a file path,
@@ -1313,12 +1314,12 @@ public class IgfsMetaManager extends IgfsManager {
                 id2InfoPrj.getAndPut(newInfo.id(), newInfo);
 
                 // Add new info to trash listing.
-                id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
-                    new IgfsListingEntry(newInfo), false));
+                id2InfoPrj.invoke(TRASH_ID, new ListingAdd(newInfo.id().toString(),
+                    new IgfsListingEntry(newInfo)));
 
                 // Remove listing entries from root.
                 for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
-                    id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
+                    id2InfoPrj.invoke(ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId()));
 
                 resId = newInfo.id();
             }
@@ -1468,7 +1469,7 @@ public class IgfsMetaManager extends IgfsManager {
                         IgfsListingEntry listingEntry = parentInfo.listing().get(name);
 
                         if (listingEntry != null)
-                            id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
+                            id2InfoPrj.invoke(parentId, new ListingRemove(name, listingEntry.fileId()));
 
                         IgfsFileInfo deleted = id2InfoPrj.getAndRemove(id);
 
@@ -1599,7 +1600,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                 assert id2InfoPrj.get(parentId) != null;
 
-                id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false));
+                id2InfoPrj.invoke(parentId, new ListingAdd(fileName, entry));
             }
 
             return newInfo;
@@ -2009,9 +2010,9 @@ public class IgfsMetaManager extends IgfsManager {
                                 id2InfoPrj.put(newInfo.id(), newInfo); // Put the new one.
 
                                 id2InfoPrj.invoke(parentInfo.id(),
-                                    new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true));
+                                    new ListingRemove(path.name(), parentInfo.listing().get(path.name()).fileId()));
                                 id2InfoPrj.invoke(parentInfo.id(),
-                                    new UpdateListing(path.name(), new IgfsListingEntry(newInfo), false));
+                                    new ListingAdd(path.name(), new IgfsListingEntry(newInfo)));
 
                                 IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo);
                             }
@@ -3213,10 +3214,82 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
+     * Remove entry from directory listing.
+     */
+    @GridInternal
+    private static final class ListingRemove implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** File name. */
+        private String fileName;
+
+        /** Expected ID. */
+        private IgniteUuid fileId;
+
+        /**
+         * Default constructor.
+         */
+        public ListingRemove() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param fileName File name.
+         * @param fileId File ID.
+         */
+        public ListingRemove(String fileName, IgniteUuid fileId) {
+            this.fileName = fileName;
+            this.fileId = fileId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo fileInfo = e.getValue();
+
+            assert fileInfo != null;
+            assert fileInfo.isDirectory();
+
+            Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+            listing.putAll(fileInfo.listing());
+
+            IgfsListingEntry oldEntry = listing.get(fileName);
+
+            if (oldEntry == null || !oldEntry.fileId().equals(fileId))
+                throw new IgniteException("Directory listing doesn't contain expected file" +
+                    " [listing=" + listing + ", fileName=" + fileName + "]");
+
+            // Modify listing in-place.
+            listing.remove(fileName);
+
+            e.setValue(new IgfsFileInfo(listing, fileInfo));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, fileName);
+            U.writeGridUuid(out, fileId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fileName = U.readString(in);
+            fileId = U.readGridUuid(in);
+        }
+    }
+
+    /**
      * Update directory listing closure.
      */
     @GridInternal
-    private static final class UpdateListing implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+    private static final class ListingAdd implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -3227,30 +3300,25 @@ public class IgfsMetaManager extends IgfsManager {
         /** File ID.*/
         private IgfsListingEntry entry;
 
-        /** Update operation: remove entry from listing if {@code true} or add entry to listing if {@code false}. */
-        private boolean rmv;
-
         /**
          * Constructs update directory listing closure.
          *
          * @param fileName File name to add into parent listing.
          * @param entry Listing entry to add or remove.
-         * @param rmv Remove entry from listing if {@code true} or add entry to listing if {@code false}.
          */
-        private UpdateListing(String fileName, IgfsListingEntry entry, boolean rmv) {
+        private ListingAdd(String fileName, IgfsListingEntry entry) {
             assert fileName != null;
             assert entry != null;
 
             this.fileName = fileName;
             this.entry = entry;
-            this.rmv = rmv;
         }
 
         /**
          * Empty constructor required for {@link Externalizable}.
          *
          */
-        public UpdateListing() {
+        public ListingAdd() {
             // No-op.
         }
 
@@ -3261,30 +3329,15 @@ public class IgfsMetaManager extends IgfsManager {
             assert fileInfo != null : "File info not found for the child: " + entry.fileId();
             assert fileInfo.isDirectory();
 
-            Map<String, IgfsListingEntry> listing =
-                U.newHashMap(fileInfo.listing().size() + (rmv ? 0 : 1));
-
-            listing.putAll(fileInfo.listing());
-
-            if (rmv) {
-                IgfsListingEntry oldEntry = listing.get(fileName);
+            Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
 
-                if (oldEntry == null || !oldEntry.fileId().equals(entry.fileId()))
-                    throw new IgniteException("Directory listing doesn't contain expected file" +
-                        " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry + ']');
+            // Modify listing in-place.
+            IgfsListingEntry oldEntry = listing.put(fileName, entry);
 
-                // Modify listing in-place.
-                listing.remove(fileName);
-            }
-            else {
-                // Modify listing in-place.
-                IgfsListingEntry oldEntry = listing.put(fileName, entry);
-
-                if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
-                    throw new IgniteException("Directory listing contains unexpected file" +
-                        " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
-                        ", oldEntry=" + oldEntry + ']');
-            }
+            if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
+                throw new IgniteException("Directory listing contains unexpected file" +
+                    " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
+                    ", oldEntry=" + oldEntry + ']');
 
             e.setValue(new IgfsFileInfo(listing, fileInfo));
 
@@ -3295,19 +3348,17 @@ public class IgfsMetaManager extends IgfsManager {
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeString(out, fileName);
             out.writeObject(entry);
-            out.writeBoolean(rmv);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             fileName = U.readString(in);
             entry = (IgfsListingEntry)in.readObject();
-            rmv = in.readBoolean();
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(UpdateListing.class, this);
+            return S.toString(ListingAdd.class, this);
         }
     }
 
@@ -3482,11 +3533,11 @@ public class IgfsMetaManager extends IgfsManager {
 
                                         assert deletedEntry != null;
 
-                                        id2InfoPrj.invoke(parentId, new UpdateListing(name, deletedEntry, true));
+                                        id2InfoPrj.invoke(parentId, new ListingRemove(name, deletedEntry.fileId()));
 
                                         // Add listing entry into the destination parent listing.
-                                        id2InfoPrj.invoke(TRASH_ID, new UpdateListing(
-                                                lowermostExistingInfo.id().toString(), deletedEntry, false));
+                                        id2InfoPrj.invoke(TRASH_ID, new ListingAdd(
+                                                lowermostExistingInfo.id().toString(), deletedEntry));
 
                                         // Update a file info of the removed file with a file path,
                                         // which will be used by delete worker for event notifications.
@@ -3505,7 +3556,7 @@ public class IgfsMetaManager extends IgfsManager {
                                         assert put;
 
                                         id2InfoPrj.invoke(parentId,
-                                                new UpdateListing(name, new IgfsListingEntry(newFileInfo), false));
+                                                new ListingAdd(name, new IgfsListingEntry(newFileInfo)));
 
                                         IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId);
 
@@ -3679,8 +3730,7 @@ public class IgfsMetaManager extends IgfsManager {
                 throws IgniteCheckedException {
             assert childInfo != null;
 
-            id2InfoPrj.invoke(lowermostExistingId,
-                    new UpdateListing(childName, new IgfsListingEntry(childInfo), false));
+            id2InfoPrj.invoke(lowermostExistingId, new ListingAdd(childName, new IgfsListingEntry(childInfo)));
         }
 
         /**


[02/10] ignite git commit: IGNITE-1419 .Net: Added optional "raw" flag to binary type configuration. This closes #376.

Posted by sb...@apache.org.
IGNITE-1419 .Net: Added optional "raw" flag to binary type configuration. This closes #376.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cba4f4c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cba4f4c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cba4f4c0

Branch: refs/heads/ignite-1232
Commit: cba4f4c039013a79e50078a92cc2df761156c66f
Parents: 2d38eb8
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Mar 3 09:58:40 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 09:58:40 2016 +0300

----------------------------------------------------------------------
 .../utils/PlatformConfigurationUtils.java       |   6 +-
 .../Binary/BinarySelfTest.cs                    | 110 ++--
 .../Apache.Ignite.Core.csproj                   |   2 +-
 .../Binary/BinaryReflectiveSerializer.cs        | 241 ++++++++
 .../Multicast/TcpDiscoveryMulticastIpFinder.cs  |   2 +-
 .../Impl/Binary/BinaryReflectiveActions.cs      | 576 +++++++++++++------
 .../Impl/Binary/BinaryReflectiveSerializer.cs   | 218 -------
 .../Impl/Binary/BinaryWriter.cs                 |  28 +-
 8 files changed, 749 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 32ab812..c0e9f1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -594,10 +594,10 @@ import java.util.Map;
                 w.writeInt(multiFinder.getAddressRequestAttempts());
                 w.writeInt(multiFinder.getResponseWaitTime());
 
-                Integer ttl = multiFinder.getTimeToLive();
-                w.writeBoolean(ttl != null);
+                int ttl = multiFinder.getTimeToLive();
+                w.writeBoolean(ttl != -1);
 
-                if (ttl != null)
+                if (ttl != -1)
                     w.writeInt(ttl);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
index f49a28a..24ce3c8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinarySelfTest.cs
@@ -727,16 +727,22 @@ namespace Apache.Ignite.Core.Tests.Binary
          * <summary>Check write of primitive fields through reflection.</summary>
          */
         [Test]
-        public void TestPrimitiveFieldsReflective()
+        public void TestPrimitiveFieldsReflective([Values(false, true)] bool raw)
         {
-            ICollection<BinaryTypeConfiguration> typeCfgs = 
-                new List<BinaryTypeConfiguration>();
-
-            typeCfgs.Add(new BinaryTypeConfiguration(typeof(PrimitiveFieldType)));
+            var serializer = new BinaryReflectiveSerializer {RawMode = raw};
 
-            BinaryConfiguration cfg = new BinaryConfiguration {TypeConfigurations = typeCfgs};
+            Assert.AreEqual(raw, serializer.RawMode);
 
-            Marshaller marsh = new Marshaller(cfg);
+            var marsh = new Marshaller(new BinaryConfiguration
+            {
+                TypeConfigurations = new[]
+                {
+                    new BinaryTypeConfiguration(typeof (PrimitiveFieldType))
+                    {
+                        Serializer = serializer
+                    }
+                }
+            });
 
             PrimitiveFieldType obj = new PrimitiveFieldType();
 
@@ -927,15 +933,21 @@ namespace Apache.Ignite.Core.Tests.Binary
          * <summary>Check write of object with enums.</summary>
          */
         [Test]
-        public void TestEnumsReflective()
+        public void TestEnumsReflective([Values(false, true)] bool raw)
         {
             Marshaller marsh =
                 new Marshaller(new BinaryConfiguration
                 {
                     TypeConfigurations = new[]
                     {
-                        new BinaryTypeConfiguration(typeof (EnumType)),
+                        new BinaryTypeConfiguration(typeof (EnumType))
+                        {
+                            Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                        },
                         new BinaryTypeConfiguration(typeof (TestEnum))
+                        {
+                            Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                        }
                     }
                 });
 
@@ -951,18 +963,26 @@ namespace Apache.Ignite.Core.Tests.Binary
 
             Assert.AreEqual(obj.GetHashCode(), portObj.GetHashCode());
 
-            // Test enum field in binary form
-            var binEnum = portObj.GetField<IBinaryObject>("PEnum");
-            Assert.AreEqual(obj.PEnum.GetHashCode(), binEnum.GetHashCode());
-            Assert.AreEqual((int) obj.PEnum, binEnum.EnumValue);
-            Assert.AreEqual(obj.PEnum, binEnum.Deserialize<TestEnum>());
-            Assert.AreEqual(obj.PEnum, binEnum.Deserialize<object>());
-            Assert.AreEqual(typeof(TestEnum), binEnum.Deserialize<object>().GetType());
-            Assert.AreEqual(null, binEnum.GetField<object>("someField"));
-            Assert.IsFalse(binEnum.HasField("anyField"));
+            if (!raw)
+            {
+                // Test enum field in binary form
+                var binEnum = portObj.GetField<IBinaryObject>("PEnum");
+                Assert.AreEqual(obj.PEnum.GetHashCode(), binEnum.GetHashCode());
+                Assert.AreEqual((int) obj.PEnum, binEnum.EnumValue);
+                Assert.AreEqual(obj.PEnum, binEnum.Deserialize<TestEnum>());
+                Assert.AreEqual(obj.PEnum, binEnum.Deserialize<object>());
+                Assert.AreEqual(typeof (TestEnum), binEnum.Deserialize<object>().GetType());
+                Assert.AreEqual(null, binEnum.GetField<object>("someField"));
+                Assert.IsFalse(binEnum.HasField("anyField"));
 
-            var binEnumArr = portObj.GetField<IBinaryObject[]>("PEnumArray");
-            Assert.IsTrue(binEnumArr.Select(x => x.Deserialize<TestEnum>()).SequenceEqual(obj.PEnumArray));
+                var binEnumArr = portObj.GetField<IBinaryObject[]>("PEnumArray");
+                Assert.IsTrue(binEnumArr.Select(x => x.Deserialize<TestEnum>()).SequenceEqual(obj.PEnumArray));
+            }
+            else
+            {
+                Assert.IsFalse(portObj.HasField("PEnum"));
+                Assert.IsFalse(portObj.HasField("PEnumArray"));
+            }
 
             EnumType newObj = portObj.Deserialize<EnumType>();
 
@@ -974,14 +994,20 @@ namespace Apache.Ignite.Core.Tests.Binary
          * <summary>Check write of object with collections.</summary>
          */
         [Test]
-        public void TestCollectionsReflective()
+        public void TestCollectionsReflective([Values(false, true)] bool raw)
         {
             var marsh = new Marshaller(new BinaryConfiguration
             {
-                TypeConfigurations = new List<BinaryTypeConfiguration>
+                TypeConfigurations = new[]
                 {
-                    new BinaryTypeConfiguration(typeof (CollectionsType)),
+                    new BinaryTypeConfiguration(typeof (CollectionsType))
+                    {
+                        Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                    },
                     new BinaryTypeConfiguration(typeof (InnerObjectType))
+                    {
+                        Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                    }
                 }
             });
             
@@ -1043,29 +1069,39 @@ namespace Apache.Ignite.Core.Tests.Binary
          * <summary>Check write of object fields through reflective serializer.</summary>
          */
         [Test]
-        public void TestObjectReflective()
+        public void TestObjectReflective([Values(false, true)] bool raw)
         {
-            ICollection<BinaryTypeConfiguration> typeCfgs = 
-                new List<BinaryTypeConfiguration>();
-
-            typeCfgs.Add(new BinaryTypeConfiguration(typeof(OuterObjectType)));
-            typeCfgs.Add(new BinaryTypeConfiguration(typeof(InnerObjectType)));
-
-            BinaryConfiguration cfg = new BinaryConfiguration();
-
-            cfg.TypeConfigurations = typeCfgs;
-
-            Marshaller marsh = new Marshaller(cfg);
+            var marsh = new Marshaller(new BinaryConfiguration
+            {
+                TypeConfigurations = new[]
+                {
+                    new BinaryTypeConfiguration(typeof (OuterObjectType))
+                    {
+                        Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                    },
+                    new BinaryTypeConfiguration(typeof (InnerObjectType))
+                    {
+                        Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                    }
+                }
+            });
 
             CheckObject(marsh, new OuterObjectType(), new InnerObjectType());
         }
 
         [Test]
-        public void TestStructsReflective()
+        public void TestStructsReflective([Values(false, true)] bool raw)
         {
             var marsh = new Marshaller(new BinaryConfiguration
             {
-                TypeConfigurations = new[] {new BinaryTypeConfiguration(typeof (ReflectiveStruct))}
+                TypeConfigurations =
+                    new[]
+                    {
+                        new BinaryTypeConfiguration(typeof (ReflectiveStruct))
+                        {
+                            Serializer = new BinaryReflectiveSerializer {RawMode = raw}
+                        }
+                    }
             });
 
             var obj = new ReflectiveStruct(15, 28.8);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 1b66f0c..6acc7c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -65,6 +65,7 @@
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="Binary\BinaryReflectiveSerializer.cs" />
     <Compile Include="Binary\Package-Info.cs" />
     <Compile Include="Cache\CacheAtomicUpdateTimeoutException.cs" />
     <Compile Include="Cache\CacheEntryProcessorException.cs" />
@@ -328,7 +329,6 @@
     <Compile Include="Impl\Binary\BinaryReaderHandleDictionary.cs" />
     <Compile Include="Impl\Binary\BinaryReader.cs" />
     <Compile Include="Impl\Binary\BinaryReflectiveActions.cs" />
-    <Compile Include="Impl\Binary\BinaryReflectiveSerializer.cs" />
     <Compile Include="Impl\Binary\Binary.cs" />
     <Compile Include="Impl\Binary\Structure\BinaryStructureTracker.cs" />
     <Compile Include="Impl\Binary\BinarySurrogateTypeDescriptor.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
new file mode 100644
index 0000000..02703be
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Binary/BinaryReflectiveSerializer.cs
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Binary
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Reflection;
+    using Apache.Ignite.Core.Impl.Binary;
+
+    /// <summary>
+    /// Binary serializer which reflectively writes all fields except of ones with 
+    /// <see cref="System.NonSerializedAttribute"/>.
+    /// <para />
+    /// Note that Java platform stores dates as a difference between current time 
+    /// and predefined absolute UTC date. Therefore, this difference is always the 
+    /// same for all time zones. .NET, in contrast, stores dates as a difference 
+    /// between current time and some predefined date relative to the current time 
+    /// zone. It means that this difference will be different as you change time zones. 
+    /// To overcome this discrepancy Ignite always converts .Net date to UTC form 
+    /// before serializing and allows user to decide whether to deserialize them 
+    /// in UTC or local form using <c>ReadTimestamp(..., true/false)</c> methods in 
+    /// <see cref="IBinaryReader"/> and <see cref="IBinaryRawReader"/>.
+    /// This serializer always read dates in UTC form. It means that if you have
+    /// local date in any field/property, it will be implicitly converted to UTC
+    /// form after the first serialization-deserialization cycle. 
+    /// </summary>
+    public sealed class BinaryReflectiveSerializer : IBinarySerializer
+    {
+        /** Cached binding flags. */
+        private const BindingFlags Flags = BindingFlags.Instance | BindingFlags.Public | 
+            BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
+
+        /** Cached type descriptors. */
+        private readonly IDictionary<Type, Descriptor> _types = new Dictionary<Type, Descriptor>();
+
+        /** Raw mode flag. */
+        private bool _rawMode;
+
+        /// <summary>
+        /// Gets or value indicating whether raw mode serialization should be used.
+        /// <para />
+        /// Raw mode does not include field names, improving performance and memory usage.
+        /// However, queries do not support raw objects.
+        /// </summary>
+        public bool RawMode
+        {
+            get { return _rawMode; }
+            set
+            {
+                if (_types.Count > 0)
+                    throw new InvalidOperationException(typeof (BinarizableSerializer).Name +
+                        ".RawMode cannot be changed after first serialization.");
+
+                _rawMode = value;
+            }
+        }
+
+        /// <summary>
+        /// Write portalbe object.
+        /// </summary>
+        /// <param name="obj">Object.</param>
+        /// <param name="writer">Writer.</param>
+        /// <exception cref="BinaryObjectException">Type is not registered in serializer:  + type.Name</exception>
+        public void WriteBinary(object obj, IBinaryWriter writer)
+        {
+            var binarizable = obj as IBinarizable;
+
+            if (binarizable != null)
+                binarizable.WriteBinary(writer);
+            else
+                GetDescriptor(obj).Write(obj, writer);
+        }
+
+        /// <summary>
+        /// Read binary object.
+        /// </summary>
+        /// <param name="obj">Instantiated empty object.</param>
+        /// <param name="reader">Reader.</param>
+        /// <exception cref="BinaryObjectException">Type is not registered in serializer:  + type.Name</exception>
+        public void ReadBinary(object obj, IBinaryReader reader)
+        {
+            var binarizable = obj as IBinarizable;
+            
+            if (binarizable != null)
+                binarizable.ReadBinary(reader);
+            else
+                GetDescriptor(obj).Read(obj, reader);
+        }
+
+        /// <summary>Register type.</summary>
+        /// <param name="type">Type.</param>
+        /// <param name="typeId">Type ID.</param>
+        /// <param name="converter">Name converter.</param>
+        /// <param name="idMapper">ID mapper.</param>
+        internal void Register(Type type, int typeId, IBinaryNameMapper converter,
+            IBinaryIdMapper idMapper)
+        {
+            if (type.GetInterface(typeof(IBinarizable).Name) != null)
+                return;
+
+            List<FieldInfo> fields = new List<FieldInfo>();
+
+            Type curType = type;
+
+            while (curType != null)
+            {
+                foreach (FieldInfo field in curType.GetFields(Flags))
+                {
+                    if (!field.IsNotSerialized)
+                        fields.Add(field);
+                }
+
+                curType = curType.BaseType;
+            }
+
+            IDictionary<int, string> idMap = new Dictionary<int, string>();
+
+            foreach (FieldInfo field in fields)
+            {
+                string fieldName = BinaryUtils.CleanFieldName(field.Name);
+
+                int fieldId = BinaryUtils.FieldId(typeId, fieldName, converter, idMapper);
+
+                if (idMap.ContainsKey(fieldId))
+                {
+                    throw new BinaryObjectException("Conflicting field IDs [type=" +
+                        type.Name + ", field1=" + idMap[fieldId] + ", field2=" + fieldName +
+                        ", fieldId=" + fieldId + ']');
+                }
+                
+                idMap[fieldId] = fieldName;
+            }
+
+            fields.Sort(Compare);
+
+            Descriptor desc = new Descriptor(fields, _rawMode);
+
+            _types[type] = desc;
+        }
+
+        /// <summary>
+        /// Gets the descriptor for an object.
+        /// </summary>
+        private Descriptor GetDescriptor(object obj)
+        {
+            var type = obj.GetType();
+
+            Descriptor desc;
+
+            if (!_types.TryGetValue(type, out desc))
+                throw new BinaryObjectException("Type is not registered in serializer: " + type.Name);
+
+            return desc;
+        }
+        
+        /// <summary>
+        /// Compare two FieldInfo instances. 
+        /// </summary>
+        private static int Compare(FieldInfo info1, FieldInfo info2) {
+            string name1 = BinaryUtils.CleanFieldName(info1.Name);
+            string name2 = BinaryUtils.CleanFieldName(info2.Name);
+
+            return string.Compare(name1, name2, StringComparison.OrdinalIgnoreCase);
+        }
+
+        /// <summary>
+        /// Type descriptor. 
+        /// </summary>
+        private class Descriptor
+        {
+            /** Write actions to be performed. */
+            private readonly List<BinaryReflectiveWriteAction> _wActions;
+
+            /** Read actions to be performed. */
+            private readonly List<BinaryReflectiveReadAction> _rActions;
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="fields">Fields.</param>
+            /// <param name="raw">Raw mode.</param>
+            public Descriptor(List<FieldInfo> fields, bool raw)
+            {
+                _wActions = new List<BinaryReflectiveWriteAction>(fields.Count);
+                _rActions = new List<BinaryReflectiveReadAction>(fields.Count);
+
+                foreach (FieldInfo field in fields)
+                {
+                    BinaryReflectiveWriteAction writeAction;
+                    BinaryReflectiveReadAction readAction;
+
+                    BinaryReflectiveActions.GetTypeActions(field, out writeAction, out readAction, raw);
+
+                    _wActions.Add(writeAction);
+                    _rActions.Add(readAction);
+                }
+            }
+
+            /// <summary>
+            /// Write object.
+            /// </summary>
+            /// <param name="obj">Object.</param>
+            /// <param name="writer">Writer.</param>
+            public void Write(object obj, IBinaryWriter writer)
+            {
+                int cnt = _wActions.Count;
+
+                for (int i = 0; i < cnt; i++)
+                    _wActions[i](obj, writer);                   
+            }
+
+            /// <summary>
+            /// Read object.
+            /// </summary>
+            /// <param name="obj">Object.</param>
+            /// <param name="reader">Reader.</param>
+            public void Read(object obj, IBinaryReader reader)
+            {
+                int cnt = _rActions.Count;
+
+                for (int i = 0; i < cnt; i++ )
+                    _rActions[i](obj, reader);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
index 5df5ea1..4581a04 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Discovery/Tcp/Multicast/TcpDiscoveryMulticastIpFinder.cs
@@ -91,7 +91,7 @@ namespace Apache.Ignite.Core.Discovery.Tcp.Multicast
         /// Gets or sets the time to live for multicast packets sent out on this
         /// IP finder in order to control the scope of the multicast.
         /// </summary>
-        public byte? TimeToLive { get; set; }  // TODO: Nullable?
+        public byte? TimeToLive { get; set; } 
 
         /// <summary>
         /// Initializes a new instance of the <see cref="TcpDiscoveryMulticastIpFinder"/> class.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
index 15509fc..8b5e2a1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveActions.cs
@@ -49,29 +49,53 @@ namespace Apache.Ignite.Core.Impl.Binary
         private static readonly MethodInfo MthdReadEnum =
             typeof(IBinaryReader).GetMethod("ReadEnum", new[] { typeof(string) });
 
+        /** Method: read enum. */
+        private static readonly MethodInfo MthdReadEnumRaw = typeof (IBinaryRawReader).GetMethod("ReadEnum");
+
         /** Method: read enum array. */
         private static readonly MethodInfo MthdReadEnumArray =
             typeof(IBinaryReader).GetMethod("ReadEnumArray", new[] { typeof(string) });
 
+        /** Method: read enum array. */
+        private static readonly MethodInfo MthdReadEnumArrayRaw = typeof(IBinaryRawReader).GetMethod("ReadEnumArray");
+
         /** Method: read array. */
         private static readonly MethodInfo MthdReadObjArray =
             typeof(IBinaryReader).GetMethod("ReadArray", new[] { typeof(string) });
 
+        /** Method: read array. */
+        private static readonly MethodInfo MthdReadObjArrayRaw = typeof(IBinaryRawReader).GetMethod("ReadArray");
+
         /** Method: read object. */
         private static readonly MethodInfo MthdReadObj=
             typeof(IBinaryReader).GetMethod("ReadObject", new[] { typeof(string) });
 
+        /** Method: read object. */
+        private static readonly MethodInfo MthdReadObjRaw = typeof(IBinaryRawReader).GetMethod("ReadObject");
+
         /** Method: write enum array. */
-        private static readonly MethodInfo MthdWriteEnumArray =
-            typeof(IBinaryWriter).GetMethod("WriteEnumArray");
+        private static readonly MethodInfo MthdWriteEnumArray = typeof(IBinaryWriter).GetMethod("WriteEnumArray");
+
+        /** Method: write enum array. */
+        private static readonly MethodInfo MthdWriteEnumArrayRaw = typeof(IBinaryRawWriter).GetMethod("WriteEnumArray");
 
         /** Method: write array. */
-        private static readonly MethodInfo MthdWriteObjArray =
-            typeof(IBinaryWriter).GetMethod("WriteArray");
+        private static readonly MethodInfo MthdWriteObjArray = typeof(IBinaryWriter).GetMethod("WriteArray");
 
-        /** Method: read object. */
-        private static readonly MethodInfo MthdWriteObj =
-            typeof(IBinaryWriter).GetMethod("WriteObject");
+        /** Method: write array. */
+        private static readonly MethodInfo MthdWriteObjArrayRaw = typeof(IBinaryRawWriter).GetMethod("WriteArray");
+
+        /** Method: write object. */
+        private static readonly MethodInfo MthdWriteObj = typeof(IBinaryWriter).GetMethod("WriteObject");
+
+        /** Method: write object. */
+        private static readonly MethodInfo MthdWriteObjRaw = typeof(IBinaryRawWriter).GetMethod("WriteObject");
+
+        /** Method: raw writer */
+        private static readonly MethodInfo MthdGetRawWriter = typeof(IBinaryWriter).GetMethod("GetRawWriter");
+
+        /** Method: raw writer */
+        private static readonly MethodInfo MthdGetRawReader = typeof(IBinaryReader).GetMethod("GetRawReader");
 
         /// <summary>
         /// Lookup read/write actions for the given type.
@@ -79,17 +103,18 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="field">The field.</param>
         /// <param name="writeAction">Write action.</param>
         /// <param name="readAction">Read action.</param>
-        public static void TypeActions(FieldInfo field, out BinaryReflectiveWriteAction writeAction, 
-            out BinaryReflectiveReadAction readAction)
+        /// <param name="raw">Raw mode.</param>
+        public static void GetTypeActions(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
+            out BinaryReflectiveReadAction readAction, bool raw)
         {
             var type = field.FieldType;
 
             if (type.IsPrimitive)
-                HandlePrimitive(field, out writeAction, out readAction);
+                HandlePrimitive(field, out writeAction, out readAction, raw);
             else if (type.IsArray)
-                HandleArray(field, out writeAction, out readAction);
+                HandleArray(field, out writeAction, out readAction, raw);
             else
-                HandleOther(field, out writeAction, out readAction);
+                HandleOther(field, out writeAction, out readAction, raw);
         }
 
         /// <summary>
@@ -98,71 +123,108 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="field">The field.</param>
         /// <param name="writeAction">Write action.</param>
         /// <param name="readAction">Read action.</param>
+        /// <param name="raw">Raw mode.</param>
         /// <exception cref="IgniteException">Unsupported primitive type:  + type.Name</exception>
         private static void HandlePrimitive(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
-            out BinaryReflectiveReadAction readAction)
+            out BinaryReflectiveReadAction readAction, bool raw)
         {
             var type = field.FieldType;
 
-            if (type == typeof(bool))
+            if (type == typeof (bool))
             {
-                writeAction = GetWriter<bool>(field, (f, w, o) => w.WriteBoolean(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadBoolean(f));
+                writeAction = raw
+                    ? GetRawWriter<bool>(field, (w, o) => w.WriteBoolean(o))
+                    : GetWriter<bool>(field, (f, w, o) => w.WriteBoolean(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadBoolean())
+                    : GetReader(field, (f, r) => r.ReadBoolean(f));
             }
-            else if (type == typeof(sbyte))
+            else if (type == typeof (sbyte))
             {
-                writeAction = GetWriter<sbyte>(field, (f, w, o) => w.WriteByte(f, unchecked((byte) o)));
-                readAction = GetReader(field, (f, r) => unchecked ((sbyte)r.ReadByte(f)));
+                writeAction = raw
+                    ? GetRawWriter<sbyte>(field, (w, o) => w.WriteByte(unchecked((byte) o)))
+                    : GetWriter<sbyte>(field, (f, w, o) => w.WriteByte(f, unchecked((byte) o)));
+                readAction = raw
+                    ? GetRawReader(field, r => unchecked ((sbyte) r.ReadByte()))
+                    : GetReader(field, (f, r) => unchecked ((sbyte) r.ReadByte(f)));
             }
-            else if (type == typeof(byte))
+            else if (type == typeof (byte))
             {
-                writeAction = GetWriter<byte>(field, (f, w, o) => w.WriteByte(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadByte(f));
+                writeAction = raw
+                    ? GetRawWriter<byte>(field, (w, o) => w.WriteByte(o))
+                    : GetWriter<byte>(field, (f, w, o) => w.WriteByte(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadByte()) : GetReader(field, (f, r) => r.ReadByte(f));
             }
-            else if (type == typeof(short))
+            else if (type == typeof (short))
             {
-                writeAction = GetWriter<short>(field, (f, w, o) => w.WriteShort(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadShort(f));
+                writeAction = raw
+                    ? GetRawWriter<short>(field, (w, o) => w.WriteShort(o))
+                    : GetWriter<short>(field, (f, w, o) => w.WriteShort(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadShort()) : GetReader(field, (f, r) => r.ReadShort(f));
             }
-            else if (type == typeof(ushort))
+            else if (type == typeof (ushort))
             {
-                writeAction = GetWriter<ushort>(field, (f, w, o) => w.WriteShort(f, unchecked((short) o)));
-                readAction = GetReader(field, (f, r) => unchecked((ushort) r.ReadShort(f)));
+                writeAction = raw
+                    ? GetRawWriter<ushort>(field, (w, o) => w.WriteShort(unchecked((short) o)))
+                    : GetWriter<ushort>(field, (f, w, o) => w.WriteShort(f, unchecked((short) o)));
+                readAction = raw
+                    ? GetRawReader(field, r => unchecked((ushort) r.ReadShort()))
+                    : GetReader(field, (f, r) => unchecked((ushort) r.ReadShort(f)));
             }
-            else if (type == typeof(char))
+            else if (type == typeof (char))
             {
-                writeAction = GetWriter<char>(field, (f, w, o) => w.WriteChar(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadChar(f));
+                writeAction = raw
+                    ? GetRawWriter<char>(field, (w, o) => w.WriteChar(o))
+                    : GetWriter<char>(field, (f, w, o) => w.WriteChar(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadChar()) : GetReader(field, (f, r) => r.ReadChar(f));
             }
-            else if (type == typeof(int))
+            else if (type == typeof (int))
             {
-                writeAction = GetWriter<int>(field, (f, w, o) => w.WriteInt(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadInt(f));
+                writeAction = raw
+                    ? GetRawWriter<int>(field, (w, o) => w.WriteInt(o))
+                    : GetWriter<int>(field, (f, w, o) => w.WriteInt(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadInt()) : GetReader(field, (f, r) => r.ReadInt(f));
             }
-            else if (type == typeof(uint))
+            else if (type == typeof (uint))
             {
-                writeAction = GetWriter<uint>(field, (f, w, o) => w.WriteInt(f, unchecked((int) o)));
-                readAction = GetReader(field, (f, r) => unchecked((uint) r.ReadInt(f)));
+                writeAction = raw
+                    ? GetRawWriter<uint>(field, (w, o) => w.WriteInt(unchecked((int) o)))
+                    : GetWriter<uint>(field, (f, w, o) => w.WriteInt(f, unchecked((int) o)));
+                readAction = raw
+                    ? GetRawReader(field, r => unchecked((uint) r.ReadInt()))
+                    : GetReader(field, (f, r) => unchecked((uint) r.ReadInt(f)));
             }
-            else if (type == typeof(long))
+            else if (type == typeof (long))
             {
-                writeAction = GetWriter<long>(field, (f, w, o) => w.WriteLong(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadLong(f));
+                writeAction = raw
+                    ? GetRawWriter<long>(field, (w, o) => w.WriteLong(o))
+                    : GetWriter<long>(field, (f, w, o) => w.WriteLong(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadLong()) : GetReader(field, (f, r) => r.ReadLong(f));
             }
-            else if (type == typeof(ulong))
+            else if (type == typeof (ulong))
             {
-                writeAction = GetWriter<ulong>(field, (f, w, o) => w.WriteLong(f, unchecked((long) o)));
-                readAction = GetReader(field, (f, r) => unchecked((ulong) r.ReadLong(f)));
+                writeAction = raw
+                    ? GetRawWriter<ulong>(field, (w, o) => w.WriteLong(unchecked((long) o)))
+                    : GetWriter<ulong>(field, (f, w, o) => w.WriteLong(f, unchecked((long) o)));
+                readAction = raw
+                    ? GetRawReader(field, r => unchecked((ulong) r.ReadLong()))
+                    : GetReader(field, (f, r) => unchecked((ulong) r.ReadLong(f)));
             }
-            else if (type == typeof(float))
+            else if (type == typeof (float))
             {
-                writeAction = GetWriter<float>(field, (f, w, o) => w.WriteFloat(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadFloat(f));
+                writeAction = raw
+                    ? GetRawWriter<float>(field, (w, o) => w.WriteFloat(o))
+                    : GetWriter<float>(field, (f, w, o) => w.WriteFloat(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadFloat()) : GetReader(field, (f, r) => r.ReadFloat(f));
             }
-            else if (type == typeof(double))
+            else if (type == typeof (double))
             {
-                writeAction = GetWriter<double>(field, (f, w, o) => w.WriteDouble(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadDouble(f));
+                writeAction = raw
+                    ? GetRawWriter<double>(field, (w, o) => w.WriteDouble(o))
+                    : GetWriter<double>(field, (f, w, o) => w.WriteDouble(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadDouble())
+                    : GetReader(field, (f, r) => r.ReadDouble(f));
             }
             else
                 throw new IgniteException("Unsupported primitive type: " + type.Name);
@@ -174,96 +236,165 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="field">The field.</param>
         /// <param name="writeAction">Write action.</param>
         /// <param name="readAction">Read action.</param>
+        /// <param name="raw">Raw mode.</param>
         private static void HandleArray(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
-            out BinaryReflectiveReadAction readAction)
+            out BinaryReflectiveReadAction readAction, bool raw)
         {
             Type elemType = field.FieldType.GetElementType();
 
-            if (elemType == typeof(bool))
-            {
-                writeAction = GetWriter<bool[]>(field, (f, w, o) => w.WriteBooleanArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadBooleanArray(f));
-            }
-            else if (elemType == typeof(byte))
-            {
-                writeAction = GetWriter<byte[]>(field, (f, w, o) => w.WriteByteArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadByteArray(f));
-            }
-            else if (elemType == typeof(sbyte))
-            {
-                writeAction = GetWriter<sbyte[]>(field, (f, w, o) => w.WriteByteArray(f, (byte[]) (Array) o));
-                readAction = GetReader(field, (f, r) => (sbyte[]) (Array) r.ReadByteArray(f));
-            }
-            else if (elemType == typeof(short))
-            {
-                writeAction = GetWriter<short[]>(field, (f, w, o) => w.WriteShortArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadShortArray(f));
-            }
-            else if (elemType == typeof(ushort))
-            {
-                writeAction = GetWriter<ushort[]>(field, (f, w, o) => w.WriteShortArray(f, (short[]) (Array) o));
-                readAction = GetReader(field, (f, r) => (ushort[]) (Array) r.ReadShortArray(f));
-            }
-            else if (elemType == typeof(char))
-            {
-                writeAction = GetWriter<char[]>(field, (f, w, o) => w.WriteCharArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadCharArray(f));
-            }
-            else if (elemType == typeof(int))
-            {
-                writeAction = GetWriter<int[]>(field, (f, w, o) => w.WriteIntArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadIntArray(f));
-            }
-            else if (elemType == typeof(uint))
-            {
-                writeAction = GetWriter<uint[]>(field, (f, w, o) => w.WriteIntArray(f, (int[]) (Array) o));
-                readAction = GetReader(field, (f, r) => (uint[]) (Array) r.ReadIntArray(f));
+            if (elemType == typeof (bool))
+            {
+                writeAction = raw
+                    ? GetRawWriter<bool[]>(field, (w, o) => w.WriteBooleanArray(o))
+                    : GetWriter<bool[]>(field, (f, w, o) => w.WriteBooleanArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadBooleanArray())
+                    : GetReader(field, (f, r) => r.ReadBooleanArray(f));
+            }
+            else if (elemType == typeof (byte))
+            {
+                writeAction = raw
+                    ? GetRawWriter<byte[]>(field, (w, o) => w.WriteByteArray(o))
+                    : GetWriter<byte[]>(field, (f, w, o) => w.WriteByteArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadByteArray())
+                    : GetReader(field, (f, r) => r.ReadByteArray(f));
+            }
+            else if (elemType == typeof (sbyte))
+            {
+                writeAction = raw
+                    ? GetRawWriter<sbyte[]>(field, (w, o) => w.WriteByteArray((byte[]) (Array) o))
+                    : GetWriter<sbyte[]>(field, (f, w, o) => w.WriteByteArray(f, (byte[]) (Array) o));
+                readAction = raw
+                    ? GetRawReader(field, r => (sbyte[]) (Array) r.ReadByteArray())
+                    : GetReader(field, (f, r) => (sbyte[]) (Array) r.ReadByteArray(f));
+            }
+            else if (elemType == typeof (short))
+            {
+                writeAction = raw
+                    ? GetRawWriter<short[]>(field, (w, o) => w.WriteShortArray(o))
+                    : GetWriter<short[]>(field, (f, w, o) => w.WriteShortArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadShortArray())
+                    : GetReader(field, (f, r) => r.ReadShortArray(f));
+            }
+            else if (elemType == typeof (ushort))
+            {
+                writeAction = raw
+                    ? GetRawWriter<ushort[]>(field, (w, o) => w.WriteShortArray((short[]) (Array) o))
+                    : GetWriter<ushort[]>(field, (f, w, o) => w.WriteShortArray(f, (short[]) (Array) o));
+                readAction = raw
+                    ? GetRawReader(field, r => (ushort[]) (Array) r.ReadShortArray())
+                    : GetReader(field, (f, r) => (ushort[]) (Array) r.ReadShortArray(f));
+            }
+            else if (elemType == typeof (char))
+            {
+                writeAction = raw
+                    ? GetRawWriter<char[]>(field, (w, o) => w.WriteCharArray(o))
+                    : GetWriter<char[]>(field, (f, w, o) => w.WriteCharArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadCharArray())
+                    : GetReader(field, (f, r) => r.ReadCharArray(f));
+            }
+            else if (elemType == typeof (int))
+            {
+                writeAction = raw
+                    ? GetRawWriter<int[]>(field, (w, o) => w.WriteIntArray(o))
+                    : GetWriter<int[]>(field, (f, w, o) => w.WriteIntArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadIntArray())
+                    : GetReader(field, (f, r) => r.ReadIntArray(f));
+            }
+            else if (elemType == typeof (uint))
+            {
+                writeAction = raw
+                    ? GetRawWriter<uint[]>(field, (w, o) => w.WriteIntArray((int[]) (Array) o))
+                    : GetWriter<uint[]>(field, (f, w, o) => w.WriteIntArray(f, (int[]) (Array) o));
+                readAction = raw
+                    ? GetRawReader(field, r => (uint[]) (Array) r.ReadIntArray())
+                    : GetReader(field, (f, r) => (uint[]) (Array) r.ReadIntArray(f));
+            }
+            else if (elemType == typeof (long))
+            {
+                writeAction = raw
+                    ? GetRawWriter<long[]>(field, (w, o) => w.WriteLongArray(o))
+                    : GetWriter<long[]>(field, (f, w, o) => w.WriteLongArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadLongArray())
+                    : GetReader(field, (f, r) => r.ReadLongArray(f));
+            }
+            else if (elemType == typeof (ulong))
+            {
+                writeAction = raw
+                    ? GetRawWriter<ulong[]>(field, (w, o) => w.WriteLongArray((long[]) (Array) o))
+                    : GetWriter<ulong[]>(field, (f, w, o) => w.WriteLongArray(f, (long[]) (Array) o));
+                readAction = raw
+                    ? GetRawReader(field, r => (ulong[]) (Array) r.ReadLongArray())
+                    : GetReader(field, (f, r) => (ulong[]) (Array) r.ReadLongArray(f));
+            }
+            else if (elemType == typeof (float))
+            {
+                writeAction = raw
+                    ? GetRawWriter<float[]>(field, (w, o) => w.WriteFloatArray(o))
+                    : GetWriter<float[]>(field, (f, w, o) => w.WriteFloatArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadFloatArray())
+                    : GetReader(field, (f, r) => r.ReadFloatArray(f));
+            }
+            else if (elemType == typeof (double))
+            {
+                writeAction = raw
+                    ? GetRawWriter<double[]>(field, (w, o) => w.WriteDoubleArray(o))
+                    : GetWriter<double[]>(field, (f, w, o) => w.WriteDoubleArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadDoubleArray())
+                    : GetReader(field, (f, r) => r.ReadDoubleArray(f));
+            }
+            else if (elemType == typeof (decimal?))
+            {
+                writeAction = raw
+                    ? GetRawWriter<decimal?[]>(field, (w, o) => w.WriteDecimalArray(o))
+                    : GetWriter<decimal?[]>(field, (f, w, o) => w.WriteDecimalArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadDecimalArray())
+                    : GetReader(field, (f, r) => r.ReadDecimalArray(f));
+            }
+            else if (elemType == typeof (string))
+            {
+                writeAction = raw
+                    ? GetRawWriter<string[]>(field, (w, o) => w.WriteStringArray(o))
+                    : GetWriter<string[]>(field, (f, w, o) => w.WriteStringArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadStringArray())
+                    : GetReader(field, (f, r) => r.ReadStringArray(f));
+            }
+            else if (elemType == typeof (Guid?))
+            {
+                writeAction = raw
+                    ? GetRawWriter<Guid?[]>(field, (w, o) => w.WriteGuidArray(o))
+                    : GetWriter<Guid?[]>(field, (f, w, o) => w.WriteGuidArray(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadGuidArray())
+                    : GetReader(field, (f, r) => r.ReadGuidArray(f));
             }
-            else if (elemType == typeof(long))
-            {
-                writeAction = GetWriter<long[]>(field, (f, w, o) => w.WriteLongArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadLongArray(f));
-            }
-            else if (elemType == typeof(ulong))
-            {
-                writeAction = GetWriter<ulong[]>(field, (f, w, o) => w.WriteLongArray(f, (long[]) (Array) o));
-                readAction = GetReader(field, (f, r) => (ulong[]) (Array) r.ReadLongArray(f));
-            }
-            else if (elemType == typeof(float))
-            {
-                writeAction = GetWriter<float[]>(field, (f, w, o) => w.WriteFloatArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadFloatArray(f));
-            }
-            else if (elemType == typeof(double))
-            {
-                writeAction = GetWriter<double[]>(field, (f, w, o) => w.WriteDoubleArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadDoubleArray(f));
-            }
-            else if (elemType == typeof(decimal?))
-            {
-                writeAction = GetWriter<decimal?[]>(field, (f, w, o) => w.WriteDecimalArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadDecimalArray(f));
-            }
-            else if (elemType == typeof(string))
-            {
-                writeAction = GetWriter<string[]>(field, (f, w, o) => w.WriteStringArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadStringArray(f));
-            }
-            else if (elemType == typeof(Guid?))
-            {
-                writeAction = GetWriter<Guid?[]>(field, (f, w, o) => w.WriteGuidArray(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadGuidArray(f));
-            } 
             else if (elemType.IsEnum)
             {
-                writeAction = GetWriter(field, MthdWriteEnumArray, elemType);
-                readAction = GetReader(field, MthdReadEnumArray, elemType);
+                writeAction = raw
+                    ? GetRawWriter(field, MthdWriteEnumArrayRaw, elemType)
+                    : GetWriter(field, MthdWriteEnumArray, elemType);
+                readAction = raw
+                    ? GetRawReader(field, MthdReadEnumArrayRaw, elemType)
+                    : GetReader(field, MthdReadEnumArray, elemType);
             }
             else
             {
-                writeAction = GetWriter(field, MthdWriteObjArray, elemType);
-                readAction = GetReader(field, MthdReadObjArray, elemType);
-            }  
+                writeAction = raw
+                    ? GetRawWriter(field, MthdWriteObjArrayRaw, elemType)
+                    : GetWriter(field, MthdWriteObjArray, elemType);
+                readAction = raw
+                    ? GetRawReader(field, MthdReadObjArrayRaw, elemType)
+                    : GetReader(field, MthdReadObjArray, elemType);
+            }
         }
 
         /// <summary>
@@ -272,57 +403,84 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="field">The field.</param>
         /// <param name="writeAction">Write action.</param>
         /// <param name="readAction">Read action.</param>
+        /// <param name="raw">Raw mode.</param>
         private static void HandleOther(FieldInfo field, out BinaryReflectiveWriteAction writeAction,
-            out BinaryReflectiveReadAction readAction)
+            out BinaryReflectiveReadAction readAction, bool raw)
         {
             var type = field.FieldType;
 
             var genericDef = type.IsGenericType ? type.GetGenericTypeDefinition() : null;
 
-            bool nullable = genericDef == typeof(Nullable<>);
+            bool nullable = genericDef == typeof (Nullable<>);
 
             var nullableType = nullable ? type.GetGenericArguments()[0] : null;
 
-            if (type == typeof(decimal))
+            if (type == typeof (decimal))
             {
-                writeAction = GetWriter<decimal>(field, (f, w, o) => w.WriteDecimal(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadDecimal(f));
+                writeAction = raw
+                    ? GetRawWriter<decimal>(field, (w, o) => w.WriteDecimal(o))
+                    : GetWriter<decimal>(field, (f, w, o) => w.WriteDecimal(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadDecimal())
+                    : GetReader(field, (f, r) => r.ReadDecimal(f));
             }
-            else if (type == typeof(string))
+            else if (type == typeof (string))
             {
-                writeAction = GetWriter<string>(field, (f, w, o) => w.WriteString(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadString(f));
+                writeAction = raw
+                    ? GetRawWriter<string>(field, (w, o) => w.WriteString(o))
+                    : GetWriter<string>(field, (f, w, o) => w.WriteString(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadString())
+                    : GetReader(field, (f, r) => r.ReadString(f));
             }
-            else if (type == typeof(Guid))
+            else if (type == typeof (Guid))
             {
-                writeAction = GetWriter<Guid>(field, (f, w, o) => w.WriteGuid(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadObject<Guid>(f));
+                writeAction = raw
+                    ? GetRawWriter<Guid>(field, (w, o) => w.WriteGuid(o))
+                    : GetWriter<Guid>(field, (f, w, o) => w.WriteGuid(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadObject<Guid>())
+                    : GetReader(field, (f, r) => r.ReadObject<Guid>(f));
             }
-            else if (nullable && nullableType == typeof(Guid))
+            else if (nullable && nullableType == typeof (Guid))
             {
-                writeAction = GetWriter<Guid?>(field, (f, w, o) => w.WriteGuid(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadGuid(f));
-            } 
+                writeAction = raw
+                    ? GetRawWriter<Guid?>(field, (w, o) => w.WriteGuid(o))
+                    : GetWriter<Guid?>(field, (f, w, o) => w.WriteGuid(f, o));
+                readAction = raw ? GetRawReader(field, r => r.ReadGuid()) : GetReader(field, (f, r) => r.ReadGuid(f));
+            }
             else if (type.IsEnum)
             {
-                writeAction = GetWriter<object>(field, (f, w, o) => w.WriteEnum(f, o), true);
-                readAction = GetReader(field, MthdReadEnum);
+                writeAction = raw
+                    ? GetRawWriter<object>(field, (w, o) => w.WriteEnum(o), true)
+                    : GetWriter<object>(field, (f, w, o) => w.WriteEnum(f, o), true);
+                readAction = raw ? GetRawReader(field, MthdReadEnumRaw) : GetReader(field, MthdReadEnum);
             }
-            else if (type == BinaryUtils.TypDictionary || type.GetInterface(BinaryUtils.TypDictionary.FullName) != null && !type.IsGenericType)
+            else if (type == BinaryUtils.TypDictionary ||
+                     type.GetInterface(BinaryUtils.TypDictionary.FullName) != null && !type.IsGenericType)
             {
-                writeAction = GetWriter<IDictionary>(field, (f, w, o) => w.WriteDictionary(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadDictionary(f));
+                writeAction = raw
+                    ? GetRawWriter<IDictionary>(field, (w, o) => w.WriteDictionary(o))
+                    : GetWriter<IDictionary>(field, (f, w, o) => w.WriteDictionary(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadDictionary())
+                    : GetReader(field, (f, r) => r.ReadDictionary(f));
             }
-            else if (type == BinaryUtils.TypCollection || type.GetInterface(BinaryUtils.TypCollection.FullName) != null && !type.IsGenericType)
+            else if (type == BinaryUtils.TypCollection ||
+                     type.GetInterface(BinaryUtils.TypCollection.FullName) != null && !type.IsGenericType)
             {
-                writeAction = GetWriter<ICollection>(field, (f, w, o) => w.WriteCollection(f, o));
-                readAction = GetReader(field, (f, r) => r.ReadCollection(f));
+                writeAction = raw
+                    ? GetRawWriter<ICollection>(field, (w, o) => w.WriteCollection(o))
+                    : GetWriter<ICollection>(field, (f, w, o) => w.WriteCollection(f, o));
+                readAction = raw
+                    ? GetRawReader(field, r => r.ReadCollection())
+                    : GetReader(field, (f, r) => r.ReadCollection(f));
             }
             else
             {
-                writeAction = GetWriter(field, MthdWriteObj);
-                readAction = GetReader(field, MthdReadObj);
-            }                
+                writeAction = raw ? GetRawWriter(field, MthdWriteObjRaw) : GetWriter(field, MthdWriteObj);
+                readAction = raw ? GetRawReader(field, MthdReadObjRaw) : GetReader(field, MthdReadObj);
+            }
         }
 
         /// <summary>
@@ -334,6 +492,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             Debug.Assert(field != null);
             Debug.Assert(field.DeclaringType != null);   // non-static
+            Debug.Assert(write != null);
 
             // Get field value
             var targetParam = Expression.Parameter(typeof(object));
@@ -353,13 +512,59 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
+        /// Gets the reader with a specified write action.
+        /// </summary>
+        private static BinaryReflectiveWriteAction GetRawWriter<T>(FieldInfo field,
+            Expression<Action<IBinaryRawWriter, T>> write,
+            bool convertFieldValToObject = false)
+        {
+            Debug.Assert(field != null);
+            Debug.Assert(field.DeclaringType != null);   // non-static
+            Debug.Assert(write != null);
+
+            // Get field value
+            var targetParam = Expression.Parameter(typeof(object));
+            var targetParamConverted = Expression.Convert(targetParam, field.DeclaringType);
+            Expression fldExpr = Expression.Field(targetParamConverted, field);
+
+            if (convertFieldValToObject)
+                fldExpr = Expression.Convert(fldExpr, typeof (object));
+
+            // Call Writer method
+            var writerParam = Expression.Parameter(typeof(IBinaryWriter));
+            var writeExpr = Expression.Invoke(write, Expression.Call(writerParam, MthdGetRawWriter), fldExpr);
+
+            // Compile and return
+            return Expression.Lambda<BinaryReflectiveWriteAction>(writeExpr, targetParam, writerParam).Compile();
+        }
+
+        /// <summary>
         /// Gets the writer with a specified generic method.
         /// </summary>
-        private static BinaryReflectiveWriteAction GetWriter(FieldInfo field, MethodInfo method, 
+        private static BinaryReflectiveWriteAction GetWriter(FieldInfo field, MethodInfo method,
+            params Type[] genericArgs)
+        {
+            return GetWriter0(field, method, false, genericArgs);
+        }
+
+        /// <summary>
+        /// Gets the writer with a specified generic method.
+        /// </summary>
+        private static BinaryReflectiveWriteAction GetRawWriter(FieldInfo field, MethodInfo method,
+            params Type[] genericArgs)
+        {
+            return GetWriter0(field, method, true, genericArgs);
+        }
+
+        /// <summary>
+        /// Gets the writer with a specified generic method.
+        /// </summary>
+        private static BinaryReflectiveWriteAction GetWriter0(FieldInfo field, MethodInfo method, bool raw, 
             params Type[] genericArgs)
         {
             Debug.Assert(field != null);
             Debug.Assert(field.DeclaringType != null);   // non-static
+            Debug.Assert(method != null);
 
             if (genericArgs.Length == 0)
                 genericArgs = new[] {field.FieldType};
@@ -370,10 +575,14 @@ namespace Apache.Ignite.Core.Impl.Binary
             var fldExpr = Expression.Field(targetParamConverted, field);
 
             // Call Writer method
-            var writerParam = Expression.Parameter(typeof(IBinaryWriter));
-            var fldNameParam = Expression.Constant(BinaryUtils.CleanFieldName(field.Name));
+            var writerParam = Expression.Parameter(typeof (IBinaryWriter));
+
             var writeMethod = method.MakeGenericMethod(genericArgs);
-            var writeExpr = Expression.Call(writerParam, writeMethod, fldNameParam, fldExpr);
+
+            var writeExpr = raw
+                ? Expression.Call(Expression.Call(writerParam, MthdGetRawWriter), writeMethod, fldExpr)
+                : Expression.Call(writerParam, writeMethod, Expression.Constant(BinaryUtils.CleanFieldName(field.Name)),
+                    fldExpr);
 
             // Compile and return
             return Expression.Lambda<BinaryReflectiveWriteAction>(writeExpr, targetParam, writerParam).Compile();
@@ -405,9 +614,51 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
+        /// Gets the reader with a specified read action.
+        /// </summary>
+        private static BinaryReflectiveReadAction GetRawReader<T>(FieldInfo field, 
+            Expression<Func<IBinaryRawReader, T>> read)
+        {
+            Debug.Assert(field != null);
+            Debug.Assert(field.DeclaringType != null);   // non-static
+
+            // Call Reader method
+            var readerParam = Expression.Parameter(typeof(IBinaryReader));
+            Expression readExpr = Expression.Invoke(read, Expression.Call(readerParam, MthdGetRawReader));
+
+            if (typeof(T) != field.FieldType)
+                readExpr = Expression.Convert(readExpr, field.FieldType);
+
+            // Assign field value
+            var targetParam = Expression.Parameter(typeof(object));
+            var assignExpr = Expression.Call(DelegateConverter.GetWriteFieldMethod(field), targetParam, readExpr);
+
+            // Compile and return
+            return Expression.Lambda<BinaryReflectiveReadAction>(assignExpr, targetParam, readerParam).Compile();
+        }
+
+        /// <summary>
         /// Gets the reader with a specified generic method.
         /// </summary>
-        private static BinaryReflectiveReadAction GetReader(FieldInfo field, MethodInfo method, 
+        private static BinaryReflectiveReadAction GetReader(FieldInfo field, MethodInfo method,
+            params Type[] genericArgs)
+        {
+            return GetReader0(field, method, false, genericArgs);
+        }
+
+        /// <summary>
+        /// Gets the reader with a specified generic method.
+        /// </summary>
+        private static BinaryReflectiveReadAction GetRawReader(FieldInfo field, MethodInfo method,
+            params Type[] genericArgs)
+        {
+            return GetReader0(field, method, true, genericArgs);
+        }
+
+        /// <summary>
+        /// Gets the reader with a specified generic method.
+        /// </summary>
+        private static BinaryReflectiveReadAction GetReader0(FieldInfo field, MethodInfo method, bool raw, 
             params Type[] genericArgs)
         {
             Debug.Assert(field != null);
@@ -418,9 +669,10 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             // Call Reader method
             var readerParam = Expression.Parameter(typeof (IBinaryReader));
-            var fldNameParam = Expression.Constant(BinaryUtils.CleanFieldName(field.Name));
             var readMethod = method.MakeGenericMethod(genericArgs);
-            Expression readExpr = Expression.Call(readerParam, readMethod, fldNameParam);
+            Expression readExpr = raw
+                ? Expression.Call(Expression.Call(readerParam, MthdGetRawReader), readMethod)
+                : Expression.Call(readerParam, readMethod, Expression.Constant(BinaryUtils.CleanFieldName(field.Name)));
 
             if (readMethod.ReturnType != field.FieldType)
                 readExpr = Expression.Convert(readExpr, field.FieldType);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
deleted file mode 100644
index ceffef5..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReflectiveSerializer.cs
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Reflection;
-    using Apache.Ignite.Core.Binary;
-
-    /// <summary>
-    /// Binary serializer which reflectively writes all fields except of ones with 
-    /// <see cref="System.NonSerializedAttribute"/>.
-    /// <para />
-    /// Note that Java platform stores dates as a difference between current time 
-    /// and predefined absolute UTC date. Therefore, this difference is always the 
-    /// same for all time zones. .Net, in contrast, stores dates as a difference 
-    /// between current time and some predefined date relative to the current time 
-    /// zone. It means that this difference will be different as you change time zones. 
-    /// To overcome this discrepancy Ignite always converts .Net date to UTC form 
-    /// before serializing and allows user to decide whether to deserialize them 
-    /// in UTC or local form using <c>ReadTimestamp(..., true/false)</c> methods in 
-    /// <see cref="IBinaryReader"/> and <see cref="IBinaryRawReader"/>.
-    /// This serializer always read dates in UTC form. It means that if you have
-    /// local date in any field/property, it will be implicitly converted to UTC
-    /// form after the first serialization-deserialization cycle. 
-    /// </summary>
-    internal class BinaryReflectiveSerializer : IBinarySerializer
-    {
-        /** Cached binding flags. */
-        private const BindingFlags Flags = BindingFlags.Instance | BindingFlags.Public | 
-            BindingFlags.NonPublic | BindingFlags.DeclaredOnly;
-
-        /** Cached type descriptors. */
-        private readonly IDictionary<Type, Descriptor> _types = new Dictionary<Type, Descriptor>();
-
-        /// <summary>
-        /// Write portalbe object.
-        /// </summary>
-        /// <param name="obj">Object.</param>
-        /// <param name="writer">Writer.</param>
-        /// <exception cref="BinaryObjectException">Type is not registered in serializer:  + type.Name</exception>
-        public void WriteBinary(object obj, IBinaryWriter writer)
-        {
-            var binarizable = obj as IBinarizable;
-
-            if (binarizable != null)
-                binarizable.WriteBinary(writer);
-            else
-                GetDescriptor(obj).Write(obj, writer);
-        }
-
-        /// <summary>
-        /// Read binary object.
-        /// </summary>
-        /// <param name="obj">Instantiated empty object.</param>
-        /// <param name="reader">Reader.</param>
-        /// <exception cref="BinaryObjectException">Type is not registered in serializer:  + type.Name</exception>
-        public void ReadBinary(object obj, IBinaryReader reader)
-        {
-            var binarizable = obj as IBinarizable;
-            
-            if (binarizable != null)
-                binarizable.ReadBinary(reader);
-            else
-                GetDescriptor(obj).Read(obj, reader);
-        }
-
-        /// <summary>Register type.</summary>
-        /// <param name="type">Type.</param>
-        /// <param name="typeId">Type ID.</param>
-        /// <param name="converter">Name converter.</param>
-        /// <param name="idMapper">ID mapper.</param>
-        public void Register(Type type, int typeId, IBinaryNameMapper converter,
-            IBinaryIdMapper idMapper)
-        {
-            if (type.GetInterface(typeof(IBinarizable).Name) != null)
-                return;
-
-            List<FieldInfo> fields = new List<FieldInfo>();
-
-            Type curType = type;
-
-            while (curType != null)
-            {
-                foreach (FieldInfo field in curType.GetFields(Flags))
-                {
-                    if (!field.IsNotSerialized)
-                        fields.Add(field);
-                }
-
-                curType = curType.BaseType;
-            }
-
-            IDictionary<int, string> idMap = new Dictionary<int, string>();
-
-            foreach (FieldInfo field in fields)
-            {
-                string fieldName = BinaryUtils.CleanFieldName(field.Name);
-
-                int fieldId = BinaryUtils.FieldId(typeId, fieldName, converter, idMapper);
-
-                if (idMap.ContainsKey(fieldId))
-                {
-                    throw new BinaryObjectException("Conflicting field IDs [type=" +
-                        type.Name + ", field1=" + idMap[fieldId] + ", field2=" + fieldName +
-                        ", fieldId=" + fieldId + ']');
-                }
-                
-                idMap[fieldId] = fieldName;
-            }
-
-            fields.Sort(Compare);
-
-            Descriptor desc = new Descriptor(fields);
-
-            _types[type] = desc;
-        }
-
-        /// <summary>
-        /// Gets the descriptor for an object.
-        /// </summary>
-        private Descriptor GetDescriptor(object obj)
-        {
-            var type = obj.GetType();
-
-            Descriptor desc;
-
-            if (!_types.TryGetValue(type, out desc))
-                throw new BinaryObjectException("Type is not registered in serializer: " + type.Name);
-
-            return desc;
-        }
-        
-        /// <summary>
-        /// Compare two FieldInfo instances. 
-        /// </summary>
-        private static int Compare(FieldInfo info1, FieldInfo info2) {
-            string name1 = BinaryUtils.CleanFieldName(info1.Name);
-            string name2 = BinaryUtils.CleanFieldName(info2.Name);
-
-            return string.Compare(name1, name2, StringComparison.OrdinalIgnoreCase);
-        }
-
-        /// <summary>
-        /// Type descriptor. 
-        /// </summary>
-        private class Descriptor
-        {
-            /** Write actions to be performed. */
-            private readonly List<BinaryReflectiveWriteAction> _wActions;
-
-            /** Read actions to be performed. */
-            private readonly List<BinaryReflectiveReadAction> _rActions;
-
-            /// <summary>
-            /// Constructor.
-            /// </summary>
-            /// <param name="fields">Fields.</param>
-            public Descriptor(List<FieldInfo> fields)
-            {
-                _wActions = new List<BinaryReflectiveWriteAction>(fields.Count);
-                _rActions = new List<BinaryReflectiveReadAction>(fields.Count);
-
-                foreach (FieldInfo field in fields)
-                {
-                    BinaryReflectiveWriteAction writeAction;
-                    BinaryReflectiveReadAction readAction;
-
-                    BinaryReflectiveActions.TypeActions(field, out writeAction, out readAction);
-
-                    _wActions.Add(writeAction);
-                    _rActions.Add(readAction);
-                }
-            }
-
-            /// <summary>
-            /// Write object.
-            /// </summary>
-            /// <param name="obj">Object.</param>
-            /// <param name="writer">Writer.</param>
-            public void Write(object obj, IBinaryWriter writer)
-            {
-                int cnt = _wActions.Count;
-
-                for (int i = 0; i < cnt; i++)
-                    _wActions[i](obj, writer);                   
-            }
-
-            /// <summary>
-            /// Read object.
-            /// </summary>
-            /// <param name="obj">Object.</param>
-            /// <param name="reader">Reader.</param>
-            public void Read(object obj, IBinaryReader reader)
-            {
-                int cnt = _rActions.Count;
-
-                for (int i = 0; i < cnt; i++ )
-                    _rActions[i](obj, reader);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cba4f4c0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
index a7ce544..5b1273e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
@@ -950,10 +950,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             WriteFieldId(fieldName, BinaryUtils.TypeCollection);
 
-            if (val == null)
-                WriteNullField();
-            else
-                WriteCollection(val);
+            WriteCollection(val);
         }
 
         /// <summary>
@@ -962,8 +959,13 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="val">Collection.</param>
         public void WriteCollection(ICollection val)
         {
-            WriteByte(BinaryUtils.TypeCollection);
-            BinaryUtils.WriteCollection(val, this);
+            if (val == null)
+                WriteNullField();
+            else
+            {
+                WriteByte(BinaryUtils.TypeCollection);
+                BinaryUtils.WriteCollection(val, this);
+            }
         }
 
         /// <summary>
@@ -975,10 +977,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             WriteFieldId(fieldName, BinaryUtils.TypeDictionary);
 
-            if (val == null)
-                WriteNullField();
-            else
-                WriteDictionary(val);
+            WriteDictionary(val);
         }
 
         /// <summary>
@@ -987,8 +986,13 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <param name="val">Dictionary.</param>
         public void WriteDictionary(IDictionary val)
         {
-            WriteByte(BinaryUtils.TypeDictionary);
-            BinaryUtils.WriteDictionary(val, this);
+            if (val == null)
+                WriteNullField();
+            else
+            {
+                WriteByte(BinaryUtils.TypeDictionary);
+                BinaryUtils.WriteDictionary(val, this);
+            }
         }
 
         /// <summary>


[09/10] ignite git commit: IGNITE-2320 Deprecated setMarshallerCachePoolSize() and created new setMarshallerCacheThreadPoolSize() to be consistent with getMarshallerCacheThreadPoolSize().

Posted by sb...@apache.org.
IGNITE-2320 Deprecated  setMarshallerCachePoolSize() and created new setMarshallerCacheThreadPoolSize() to be consistent with getMarshallerCacheThreadPoolSize().


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c5db210
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c5db210
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c5db210

Branch: refs/heads/ignite-1232
Commit: 7c5db2102fb701a24e03f50232a5012122672d32
Parents: baa1312
Author: vsisko <vs...@gridgain.com>
Authored: Fri Mar 4 10:01:59 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Mar 4 10:01:59 2016 +0700

----------------------------------------------------------------------
 .../ignite/configuration/IgniteConfiguration.java     | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7c5db210/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index f705638..758a2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -891,8 +891,22 @@ public class IgniteConfiguration {
      * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
      * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
      * @return {@code this} for chaining.
+     * @deprecated Use {@link #setMarshallerCacheThreadPoolSize(int)} instead.
      */
+    @Deprecated
     public IgniteConfiguration setMarshallerCachePoolSize(int poolSize) {
+        return setMarshallerCacheThreadPoolSize(poolSize);
+    }
+
+    /**
+     * Sets default thread pool size that will be used to process marshaller messages.
+     *
+     * @param poolSize Default executor service size to use for marshaller messages.
+     * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
+     * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setMarshallerCacheThreadPoolSize(int poolSize) {
         marshCachePoolSize = poolSize;
 
         return this;


[03/10] ignite git commit: IGNITE-799 Fixed Desktop API is not supported on the current platform.

Posted by sb...@apache.org.
IGNITE-799 Fixed Desktop API is not supported on the current platform.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28ea3433
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28ea3433
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28ea3433

Branch: refs/heads/ignite-1232
Commit: 28ea3433bfbe64cd5166adc29c1badd8d601761e
Parents: cba4f4c
Author: vsisko <vs...@gridgain.com>
Authored: Thu Mar 3 16:17:57 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 3 16:17:57 2016 +0700

----------------------------------------------------------------------
 .../ignite/schema/ui/SchemaImportApp.java       | 205 ++++++++++++++++++-
 1 file changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/28ea3433/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
----------------------------------------------------------------------
diff --git a/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java b/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
index 6f9e05b..98ac357 100644
--- a/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
+++ b/modules/schema-import/src/main/java/org/apache/ignite/schema/ui/SchemaImportApp.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.schema.ui;
 
+import java.awt.Desktop;
 import java.io.BufferedInputStream;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.SQLException;
@@ -39,6 +44,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javafx.application.Application;
 import javafx.application.Platform;
 import javafx.beans.value.ChangeListener;
@@ -75,6 +82,7 @@ import javafx.stage.FileChooser;
 import javafx.stage.Screen;
 import javafx.stage.Stage;
 import javafx.util.Callback;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.schema.generator.CodeGenerator;
 import org.apache.ignite.schema.generator.XmlGenerator;
@@ -119,6 +127,9 @@ public class SchemaImportApp extends Application {
     /** Logger. */
     private static final Logger log = Logger.getLogger(SchemaImportApp.class.getName());
 
+    /** Ability to use xdg-open utility flag. */
+    private static final boolean HAS_XDG_OPEN = U.isUnix() && new File("/usr/bin/xdg-open").canExecute();
+
     /** Presets for database settings. */
     private static class Preset {
         /** Name in preferences. */
@@ -661,6 +672,194 @@ public class SchemaImportApp extends Application {
                 return null;
             }
 
+            /**
+             * Running of command with reading of first printed line.
+             *
+             * @param cmdLine Process to run.
+             * @return First printed by command line.
+             */
+            private String execAndReadLine(Process cmdLine) {
+                InputStream stream = cmdLine.getInputStream();
+                Charset cs = Charset.defaultCharset();
+
+                try(BufferedReader reader = new BufferedReader(
+                        cs == null ? new InputStreamReader(stream) : new InputStreamReader(stream, cs))) {
+                    return reader.readLine();
+                }
+                catch (IOException ignored){
+                    return null;
+                }
+            }
+
+            /**
+             * Start specified command in separate process.
+             *
+             * @param commands Executable file and command parameters in array.
+             * @return Process instance for run command.
+             */
+            private Process startProcess(List<String> commands) throws IOException {
+                ProcessBuilder builder = new ProcessBuilder(commands);
+
+                Map<String, String> environment = builder.environment();
+
+                environment.clear();
+
+                environment.putAll(System.getenv());
+
+                return builder.start();
+            }
+
+            /**
+             * Convert specified command parameters to system specific parameters.
+             *
+             * @param cmd Path to executable file.
+             * @param parameters Params for created process.
+             * @return List of converted system specific parameters.
+             */
+            private List<String> toCommandLine(String cmd, String... parameters) {
+                boolean isWin = U.isWindows();
+
+                List<String> params = new ArrayList<>(parameters.length + 1);
+
+                params.add(cmd.replace('/', File.separatorChar).replace('\\', File.separatorChar));
+
+                for (String parameter: parameters) {
+                    if (isWin) {
+                        if (parameter.contains("\"")) params.add(parameter.replace("\"", "\\\""));
+                        else if (parameter.isEmpty()) params.add("\"\"");
+                        else params.add(parameter);
+                    }
+                    else
+                        params.add(parameter);
+                }
+
+                return params;
+            }
+
+            /**
+             * Create process for run specified command.
+             *
+             * @param execPath Path to executable file.
+             * @param params Params for created process.
+             * @return Process instance for run command.
+             */
+            private Process createProcess(String execPath, String... params) throws IOException {
+                if (F.isEmpty(execPath))
+                    throw new IllegalArgumentException("Executable not specified");
+
+                return startProcess(toCommandLine(execPath, params));
+            }
+
+            /**
+             * Compare two version strings.
+             *
+             * @param v1 Version string 1.
+             * @param v2 Version string 2.
+             * @return The value 0 if the argument version is equal to this version.
+             * A value less than 0 if this version is less than the version argument.
+             * A value greater than 0 if this version is greater than the version argument.
+             */
+            private int compareVersionNumbers(String v1, String v2) {
+                if (v1 == null && v2 == null)
+                    return 0;
+
+                if (v1 == null)
+                    return -1;
+
+                if (v2 == null)
+                    return 1;
+
+                String[] part1 = v1.split("[._-]");
+                String[] part2 = v2.split("[._-]");
+
+                int idx = 0;
+
+                while (idx < part1.length && idx < part2.length) {
+                    String p1 = part1[idx];
+                    String p2 = part2[idx];
+
+                    int cmp = p1.matches("\\d+") && p2.matches("\\d+") ? new Integer(p1).compareTo(new Integer(p2)) :
+                            part1[idx].compareTo(part2[idx]);
+
+                    if (cmp != 0)
+                        return cmp;
+
+                    idx += 1;
+                }
+
+                if (part1.length == part2.length)
+                    return 0;
+                else {
+                    boolean left = part1.length > idx;
+                    String[] parts = left ? part1 : part2;
+
+                    while (idx < parts.length) {
+                        String p = parts[idx];
+
+                        int cmp = p.matches("\\d+") ? new Integer(p).compareTo(0) : 1;
+
+                        if (cmp != 0) return left ? cmp : -cmp;
+
+                        idx += 1;
+                    }
+
+                    return 0;
+                }
+            }
+
+            /**
+             * Check that system has Nautilus.
+             * @return {@code True} when Nautilus is installed or {@code false} otherwise.
+             * @throws IOException
+             */
+            private boolean canUseNautilus() throws IOException {
+                if (U.isUnix() || new File("/usr/bin/xdg-mime").canExecute() || new File("/usr/bin/nautilus").canExecute()) {
+                    String appName = execAndReadLine(createProcess("xdg-mime", "query", "default", "inode/directory"));
+
+                    if (appName == null || !appName.matches("nautilus.*\\.desktop"))
+                        return false;
+                    else {
+                        String ver = execAndReadLine(createProcess("nautilus", "--version"));
+
+                        if (ver != null) {
+                            Matcher m = Pattern.compile("GNOME nautilus ([0-9.]+)").matcher(ver);
+
+                            return m.find() && compareVersionNumbers(m.group(1), "3") >= 0;
+                        }
+                        else
+                            return false;
+                    }
+                }
+                else
+                    return false;
+            }
+
+            /**
+             * Open specified folder with selection of specified file in system file manager.
+             *
+             * @param dir Opened folder.
+             */
+            private void openFolder(File dir) throws IOException {
+                if (U.isWindows())
+                    Runtime.getRuntime().exec("explorer /root," + dir.getAbsolutePath());
+                else if (U.isMacOs())
+                    createProcess("open", dir.getAbsolutePath());
+                else if (canUseNautilus())
+                    createProcess("nautilus", dir.getAbsolutePath());
+                else {
+                    String path = dir.getAbsolutePath();
+
+                    if (HAS_XDG_OPEN)
+                        createProcess("/usr/bin/xdg-open", path);
+                    else if (Desktop.isDesktopSupported() && Desktop.getDesktop().isSupported(Desktop.Action.OPEN))
+                        Desktop.getDesktop().open(new File(path));
+                    else
+                        MessageBox.warningDialog(owner, "This action isn't supported on the current platform" +
+                            ((U.isLinux() || U.isUnix() || U.isSolaris()) ?
+                            ".\nTo fix this issue you should install library libgnome2-0." : ""));
+                }
+            }
+
             /** {@inheritDoc} */
             @Override protected void succeeded() {
                 super.succeeded();
@@ -670,9 +869,9 @@ public class SchemaImportApp extends Application {
                 if (MessageBox.confirmDialog(owner, "Generation complete!\n\n" +
                     "Reveal output folder in system default file browser?"))
                     try {
-                        java.awt.Desktop.getDesktop().open(destFolder);
+                        openFolder(destFolder);
                     }
-                    catch (IOException e) {
+                    catch (Exception e) {
                         MessageBox.errorDialog(owner, "Failed to open folder with results.", e);
                     }
             }
@@ -1550,7 +1749,7 @@ public class SchemaImportApp extends Application {
 
                 if (customPrefsFile == null)
                     log.log(Level.WARNING, "Failed to resolve path to file with custom preferences: " +
-                        customPrefsFile);
+                        customPrefsFileName);
                 else {
                     Properties customPrefs = new Properties();
 


[10/10] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-1232

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b9682c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b9682c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b9682c1

Branch: refs/heads/ignite-1232
Commit: 4b9682c1887f462ac29a9b679192acb8ed376b48
Parents: 4667293 7c5db21
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 4 11:05:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 4 11:05:40 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     |  35 +
 .../configuration/IgniteConfiguration.java      |  14 +
 .../processors/cache/IgniteCacheProxy.java      |   4 +
 .../continuous/CacheContinuousQueryHandler.java |  86 ++-
 .../CacheContinuousQueryHandlerV2.java          | 176 +++++
 .../continuous/CacheContinuousQueryManager.java | 238 +++++--
 .../continuous/GridContinuousProcessor.java     |   7 +-
 .../processors/igfs/IgfsMetaManager.java        | 215 +++---
 .../processors/platform/PlatformContext.java    |   9 +-
 .../platform/PlatformContextImpl.java           |  13 +-
 .../platform/PlatformProcessorImpl.java         |  14 +-
 .../platform/compute/PlatformCompute.java       |  23 +-
 .../utils/PlatformConfigurationUtils.java       |   6 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  75 +-
 .../cache/IgniteCacheEntryListenerTxTest.java   |   5 -
 .../GridCacheReplicatedPreloadSelfTest.java     |  39 +-
 .../CacheContinuousQueryFactoryFilterTest.java  | 714 +++++++++++++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../CacheContinuousQueryOperationP2PTest.java   | 326 +++++++++
 ...acheContinuousQueryRandomOperationsTest.java |  63 +-
 .../platform/PlatformStartIgniteTask.java       |  77 ++
 .../ignite/platform/PlatformStopIgniteTask.java |  74 ++
 .../p2p/CacheDeploymentEntryEventFilter.java    |  33 +
 .../CacheDeploymentEntryEventFilterFactory.java |  31 +
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +
 .../Apache.Ignite.Core.Tests.csproj             |   1 +
 .../Binary/BinarySelfTest.cs                    | 110 ++-
 .../Compute/MixedClusterTest.cs                 | 123 ++++
 .../Config/Compute/compute-grid1.xml            |   1 -
 .../Config/Compute/compute-grid2.xml            |  15 +-
 .../IgniteConfigurationTest.cs                  |  17 +-
 .../Apache.Ignite.Core.Tests/TestUtils.cs       |  33 +-
 .../Apache.Ignite.Core.csproj                   |   2 +-
 .../Binary/BinaryReflectiveSerializer.cs        | 241 +++++++
 .../Multicast/TcpDiscoveryMulticastIpFinder.cs  |   2 +-
 .../Impl/Binary/BinaryReflectiveActions.cs      | 576 ++++++++++-----
 .../Impl/Binary/BinaryReflectiveSerializer.cs   | 218 ------
 .../Impl/Binary/BinaryWriter.cs                 |  28 +-
 .../ignite/schema/ui/SchemaImportApp.java       | 205 +++++-
 39 files changed, 3170 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9682c1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------


[05/10] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb8b52b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb8b52b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb8b52b

Branch: refs/heads/ignite-1232
Commit: bcb8b52b7638bb9cc9be6a3d732a59ef5f4b9494
Parents: b0e85fd 28ea343
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 3 12:57:12 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 3 12:57:12 2016 +0300

----------------------------------------------------------------------
 .../ignite/schema/ui/SchemaImportApp.java       | 205 ++++++++++++++++++-
 1 file changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------