You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/30 13:47:37 UTC

[1/6] ignite git commit: IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.

Repository: ignite
Updated Branches:
  refs/heads/ignite-zk c1e20330d -> 827b70854


IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/438760ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/438760ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/438760ed

Branch: refs/heads/ignite-zk
Commit: 438760ed7f9d37bb72de5e5a38d46ce2450544f8
Parents: 5fa5ae7
Author: Evgenii Zhuravlev <e....@gmail.com>
Authored: Wed Nov 29 13:50:13 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:13 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopDefaultJobInfo.java | 15 ++++-
 .../processors/hadoop/HadoopJobInfo.java        |  7 +++
 .../processors/hadoop/impl/HadoopUtils.java     | 45 +++++++++++++-
 .../impl/fs/HadoopFileSystemCacheUtils.java     | 34 ++++++-----
 .../hadoop/impl/proto/HadoopClientProtocol.java |  8 ++-
 .../processors/hadoop/impl/v2/HadoopV2Job.java  | 32 ++++++++--
 .../hadoop/impl/v2/HadoopV2TaskContext.java     | 62 +++++++++++++-------
 .../impl/HadoopAbstractMapReduceTest.java       |  2 +-
 .../hadoop/impl/HadoopGroupingTest.java         |  2 +-
 .../hadoop/impl/HadoopJobTrackerSelfTest.java   |  4 +-
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |  2 +-
 .../hadoop/impl/HadoopPlannerMockJob.java       |  6 ++
 .../hadoop/impl/HadoopSortingTest.java          |  4 +-
 .../impl/HadoopTaskExecutionSelfTest.java       | 10 ++--
 .../hadoop/impl/HadoopTasksV1Test.java          |  2 +-
 .../hadoop/impl/HadoopTasksV2Test.java          |  2 +-
 .../hadoop/impl/HadoopTeraSortTest.java         |  2 +-
 .../hadoop/impl/HadoopV2JobSelfTest.java        |  2 +-
 .../collections/HadoopAbstractMapTest.java      |  6 ++
 .../HadoopExternalTaskExecutionSelfTest.java    |  4 +-
 20 files changed, 189 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d4a29b2..a66f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -52,6 +52,9 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
     /** User name. */
     private String user;
 
+    /** Credentials. */
+    private byte[] credentials;
+
     /**
      * Default constructor required by {@link Externalizable}.
      */
@@ -69,12 +72,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
      * @param props All other properties of the job.
      */
     public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
-        Map<String, String> props) {
+        Map<String, String> props, byte[] credentials) {
         this.jobName = jobName;
         this.user = user;
         this.hasCombiner = hasCombiner;
         this.numReduces = numReduces;
         this.props = props;
+        this.credentials = credentials;
     }
 
     /** {@inheritDoc} */
@@ -127,6 +131,11 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public byte[] credentials() {
+        return credentials;
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, jobName);
         U.writeString(out, user);
@@ -135,6 +144,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
         out.writeInt(numReduces);
 
         IgfsUtils.writeStringMap(out, props);
+
+        U.writeByteArray(out, credentials);
     }
 
     /** {@inheritDoc} */
@@ -146,6 +157,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
         numReduces = in.readInt();
 
         props = IgfsUtils.readStringMap(in);
+
+        credentials = U.readByteArray(in);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 4cc8f80..3dffbc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -69,6 +69,13 @@ public interface HadoopJobInfo {
     String user();
 
     /**
+     * Gets credentials.
+     *
+     * @return Credentials.
+     */
+    byte[] credentials();
+
+    /**
      * Creates new job instance for the given ID.
      * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
      * This method will be called once for the same ID on one node, though it can be called on the same host

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index 767e10a..89c60b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.impl;
 
 import com.google.common.primitives.Longs;
 import com.google.common.primitives.UnsignedBytes;
+import java.io.DataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -27,6 +28,8 @@ import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
@@ -211,10 +214,12 @@ public class HadoopUtils {
      * Creates JobInfo from hadoop configuration.
      *
      * @param cfg Hadoop configuration.
+     * @param credentials Credentials.
      * @return Job info.
      * @throws IgniteCheckedException If failed.
      */
-    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials)
+        throws IgniteCheckedException {
         JobConf jobConf = new JobConf(cfg);
 
         boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
@@ -269,7 +274,8 @@ public class HadoopUtils {
         for (Map.Entry<String, String> entry : jobConf)
             props.put(entry.getKey(), entry.getValue());
 
-        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props,
+            credentials);
     }
 
     /**
@@ -394,4 +400,39 @@ public class HadoopUtils {
 
         return len1 - len2;
     }
+
+    /**
+     * Deserialization of Hadoop Writable object.
+     *
+     * @param writable Writable object to deserialize to.
+     * @param bytes byte array to deserialize.
+     */
+    public static void deserialize(Writable writable, byte[] bytes) throws IOException {
+        DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
+
+        writable.readFields(dataIn);
+
+        dataIn.close();
+    }
+
+    /**
+     * Create UserGroupInformation for specified user and credentials.
+     *
+     * @param user User.
+     * @param credentialsBytes Credentials byte array.
+     */
+    public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException {
+        Credentials credentials = new Credentials();
+
+        HadoopUtils.deserialize(credentials, credentialsBytes);
+
+        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+
+        ugi.addCredentials(credentials);
+
+        if (credentials.numberOfTokens() > 0)
+            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN);
+
+        return ugi;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
index 0b673e9..f48d21d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.util.GridStringBuilder;
@@ -41,27 +42,32 @@ public class HadoopFileSystemCacheUtils {
         return new HadoopLazyConcurrentMap<>(
             new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
                 @Override public FileSystem createValue(FsCacheKey key) throws IOException {
-                    try {
-                        assert key != null;
+                    assert key != null;
 
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
 
-                        String scheme = uri.getScheme();
+                    String scheme = uri.getScheme();
 
-                        // Copy the configuration to avoid altering the external object.
-                        Configuration cfg = new Configuration(key.configuration());
+                    // Copy the configuration to avoid altering the external object.
+                    Configuration cfg = new Configuration(key.configuration());
 
-                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+                    String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
 
-                        cfg.setBoolean(prop, true);
+                    cfg.setBoolean(prop, true);
 
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
+                    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() ==
+                        UserGroupInformation.AuthenticationMethod.TOKEN)
+                        return FileSystem.get(uri, cfg);
+                    else {
+                        try {
+                            return FileSystem.get(uri, cfg, key.user());
+                        }
+                        catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
 
-                        throw new IOException("Failed to create file system due to interrupt.", e);
+                            throw new IOException("Failed to create file system due to interrupt.", e);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index 7fc0e77..811b0c2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -122,8 +123,13 @@ public class HadoopClientProtocol implements ClientProtocol {
         try {
             conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
 
+            byte[] credentials = null;
+
+            if (ts != null)
+                credentials = WritableUtils.toByteArray(ts);
+
             HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
-                jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
+                jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials));
 
             if (status == null)
                 throw new IOException("Failed to submit job (null status obtained): " + jobId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 2a85cb8..28b4d6b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl.v2;
 
+import java.security.PrivilegedExceptionAction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter;
@@ -317,7 +320,7 @@ public class HadoopV2Job extends HadoopJobEx {
     }
 
     /** {@inheritDoc} */
-    @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+    @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException {
         assert locNodeId != null;
 
         this.locNodeId = locNodeId;
@@ -325,15 +328,36 @@ public class HadoopV2Job extends HadoopJobEx {
         ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
 
         try {
-            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+            if (jobInfo.credentials() == null)
+                rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+            else {
+                UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials());
+
+                try {
+                    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                        @Override public Void run() throws Exception {
+                            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId,
+                                jobId));
+
+                            return null;
+                        }
+                    });
+                }
+                catch (IOException | InterruptedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
 
             if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) {
-                U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + " job property is set to true; please disable " +
-                    "it if job tasks rely on mutable static state.");
+                U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() +
+                    " job property is set to true; please disable " + "it if job tasks rely on mutable static state.");
 
                 sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
             }
         }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
         finally {
             HadoopCommonUtils.restoreContextClassLoader(oldLdr);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 6127822..c362b0c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
@@ -548,41 +549,58 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
 
     /** {@inheritDoc} */
     @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
-        String user = job.info().user();
+        if (job.info().credentials() == null) {
+            String user = job.info().user();
 
-        user = IgfsUtils.fixUserName(user);
+            user = IgfsUtils.fixUserName(user);
 
-        assert user != null;
+            assert user != null;
 
-        String ugiUser;
+            String ugiUser;
 
-        try {
-            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+            try {
+                UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
 
-            assert currUser != null;
+                assert currUser != null;
 
-            ugiUser = currUser.getShortUserName();
-        }
-        catch (IOException ioe) {
-            throw new IgniteCheckedException(ioe);
-        }
+                ugiUser = currUser.getShortUserName();
+            }
+            catch (IOException ioe) {
+                throw new IgniteCheckedException(ioe);
+            }
 
-        try {
-            if (F.eq(user, ugiUser))
-                // if current UGI context user is the same, do direct call:
-                return c.call();
-            else {
-                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+            try {
+                if (F.eq(user, ugiUser))
+                    // if current UGI context user is the same, do direct call:
+                    return c.call();
+                else {
+                    UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                    return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                        @Override public T run() throws Exception {
+                            return c.call();
+                        }
+                    });
+                }
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+        else {
+            try {
+                UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials());
 
                 return ugi.doAs(new PrivilegedExceptionAction<T>() {
-                    @Override public T run() throws Exception {
+                    @Override
+                    public T run() throws Exception {
                         return c.call();
                     }
                 });
             }
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException(e);
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 4928e3d..fc6d7f8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -207,7 +207,7 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
index 2de2d19..d27a234 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -127,7 +127,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest {
         }
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
-            createJobInfo(job.getConfiguration())).get(30000);
+            createJobInfo(job.getConfiguration(), null)).get(30000);
 
         assertTrue(HadoopGroupingTestState.values().isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index 381652e..c3b3040 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -121,7 +121,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
 
             HadoopJobId jobId = new HadoopJobId(globalId, 1);
 
-            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
             checkStatus(jobId, false);
 
@@ -168,7 +168,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
 
             HadoopJobId jobId = new HadoopJobId(globalId, 1);
 
-            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
             checkStatus(jobId, false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 6eb16af..21b7ee2 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
             job.setJarByClass(HadoopWordCount2.class);
 
             IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-                    createJobInfo(job.getConfiguration()));
+                    createJobInfo(job.getConfiguration(), null));
 
             fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
index 28c8264..b3368bd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -178,5 +178,11 @@ public class HadoopPlannerMockJob extends HadoopJobEx {
 
             return null;
         }
+
+        @Override public byte[] credentials() {
+            throwUnsupported();
+
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
index 2e85cce..bb11ccb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -117,7 +117,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
         X.printerrln("Data generation started.");
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration())).get(180000);
+            createJobInfo(job.getConfiguration(), null)).get(180000);
 
         X.printerrln("Data generation complete.");
 
@@ -148,7 +148,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
         X.printerrln("Job started.");
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
-            createJobInfo(job.getConfiguration())).get(180000);
+            createJobInfo(job.getConfiguration(), null)).get(180000);
 
         X.printerrln("Job complete.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index c27a67f..2394ada 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-                createJobInfo(job.getConfiguration()));
+                createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 
@@ -188,7 +188,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 
@@ -226,7 +226,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
         job.setJarByClass(getClass());
 
         final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
-                createJobInfo(job.getConfiguration()));
+                createJobInfo(job.getConfiguration(), null));
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -313,7 +313,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg, null));
 
         if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -364,7 +364,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         assertFalse(killRes);
 
-        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg, null));
 
         if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
index 1d7f3e4..ca96551 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
@@ -46,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         setupFileSystems(jobConf);
 
-        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf, null);
 
         UUID uuid = new UUID(0, 0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
index 61e3e46..0fcd358 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
@@ -65,7 +65,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
 
-        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration(), null);
 
         UUID uuid = new UUID(0, 0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index d8b74ce..46752a8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -178,7 +178,7 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
index 2c2f049..041f0bc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -80,7 +80,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopDefaultJobInfo info = createJobInfo(cfg);
+        HadoopDefaultJobInfo info = createJobInfo(cfg, null);
 
         final UUID uuid = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index b9dcae1..49be0a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -177,5 +177,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
 
             return null;
         }
+
+        @Override public byte[] credentials() {
+            assert false;
+
+            return null;
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 0afd689..1246078 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -117,7 +117,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
+            createJobInfo(job.getConfiguration(), null));
 
         fut.get();
     }
@@ -153,7 +153,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
+            createJobInfo(job.getConfiguration(), null));
 
         try {
             fut.get();


[4/6] ignite git commit: IGNITE-6335 .NET: Thin client: cache binary mode

Posted by sb...@apache.org.
IGNITE-6335 .NET: Thin client: cache binary mode

This closes #3114


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35e621fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35e621fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35e621fe

Branch: refs/heads/ignite-zk
Commit: 35e621fec2c2bc43206ec1f35a7de4ef7faad97f
Parents: c2b145a
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Nov 30 10:10:12 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Nov 30 10:10:12 2017 +0300

----------------------------------------------------------------------
 .../binary/ClientBinaryTypeGetRequest.java      |   4 +-
 .../binary/ClientBinaryTypeGetResponse.java     |  10 +-
 .../Apache.Ignite.Core.Tests.DotNetCore.csproj  |   2 +
 .../Apache.Ignite.Core.Tests.csproj             |   2 +
 .../Cache/CacheAbstractTest.cs                  |  26 +
 .../Client/Cache/BinaryBuilderTest.cs           | 118 +++++
 .../Client/Cache/CacheTest.cs                   |   2 +-
 .../Client/Cache/CacheTestKeepBinary.cs         | 499 +++++++++++++++++++
 .../Client/Cache/ScanQueryTest.cs               |  48 ++
 .../Client/ClientTestBase.cs                    |  43 ++
 .../Client/Cache/ICacheClient.cs                |  11 +
 .../Apache.Ignite.Core/Client/IIgniteClient.cs  |   9 +
 .../Impl/Binary/BinaryObjectBuilder.cs          |   5 +-
 .../Impl/Binary/BinaryProcessorClient.cs        |   7 +-
 .../Impl/Client/Cache/CacheClient.cs            |  30 +-
 .../Impl/Client/IgniteClient.cs                 |   7 +-
 16 files changed, 804 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
index 72f9f58..06cb52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
@@ -43,8 +43,8 @@ public class ClientBinaryTypeGetRequest extends ClientRequest {
 
     /** {@inheritDoc} */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        BinaryTypeImpl type = (BinaryTypeImpl)ctx.kernalContext().cacheObjects().binary().type( typeId);
+        BinaryTypeImpl type = (BinaryTypeImpl)ctx.kernalContext().cacheObjects().binary().type(typeId);
 
-        return new ClientBinaryTypeGetResponse(requestId(), type.metadata());
+        return new ClientBinaryTypeGetResponse(requestId(), type != null ? type.metadata() : null);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
index e888305..5a47c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
@@ -37,8 +37,6 @@ public class ClientBinaryTypeGetResponse extends ClientResponse {
     ClientBinaryTypeGetResponse(long requestId, BinaryMetadata meta) {
         super(requestId);
 
-        assert meta != null;
-
         this.meta = meta;
     }
 
@@ -46,6 +44,12 @@ public class ClientBinaryTypeGetResponse extends ClientResponse {
     @Override public void encode(BinaryRawWriterEx writer) {
         super.encode(writer);
 
-        PlatformUtils.writeBinaryMetadata(writer, meta, true);
+        if (meta != null) {
+            writer.writeBoolean(true);  // Not null.
+
+            PlatformUtils.writeBinaryMetadata(writer, meta, true);
+        } else {
+            writer.writeBoolean(false);  // Null.
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
index 0acaab8..5d735eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
@@ -58,7 +58,9 @@
     <Compile Include="..\Apache.Ignite.Core.Tests\Cache\Query\Linq\CacheLinqTestSimpleName.cs" Link="Cache\Query\Linq\CacheLinqTestSimpleName.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Cache\Query\Linq\CacheLinqTestSqlEscapeAll.cs" Link="Cache\Query\Linq\CacheLinqTestSqlEscapeAll.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Cache\TestReferenceObject.cs" Link="Cache\TestReferenceObject.cs" />
+    <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\BinaryBuilderTest.cs" Link="ThinClient\Cache\BinaryBuilderTest.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\CacheTest.cs" Link="ThinClient\Cache\CacheTest.cs" />
+    <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\CacheTestKeepBinary.cs" Link="ThinClient\Cache\CacheTestKeepBinary.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\EmptyObject.cs" Link="ThinClient\Cache\EmptyObject.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\Person.cs" Link="ThinClient\Cache\Person.cs" />
     <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\ScanQueryTest.cs" Link="ThinClient\Cache\ScanQueryTest.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 3951b40..8bd8f28 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -106,7 +106,9 @@
     <Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" />
     <Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
     <Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
+    <Compile Include="Client\Cache\BinaryBuilderTest.cs" />
     <Compile Include="Client\Cache\CacheTest.cs" />
+    <Compile Include="Client\Cache\CacheTestKeepBinary.cs" />
     <Compile Include="Client\Cache\CacheTestNoMeta.cs" />
     <Compile Include="Client\Cache\ClientCacheConfigurationTest.cs" />
     <Compile Include="Client\Cache\EmptyObject.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
index d54363a..fcce021 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
@@ -515,6 +515,32 @@ namespace Apache.Ignite.Core.Tests.Cache
         }
 
         [Test]
+        [Ignore("IGNITE-7072")]
+        public void TestReplaceBinary()
+        {
+            var cache = Cache<object, object>();
+            var key = new {Foo = "bar"};
+            var val = new {Bar = "baz", Id = 1};
+            var val2 = new {Bar = "baz2", Id = 2};
+            var val3 = new {Bar = "baz3", Id = 3};
+
+            Assert.IsFalse(cache.ContainsKey(key));
+            Assert.AreEqual(false, cache.Replace(key, val));
+            Assert.IsFalse(cache.ContainsKey(key));
+
+            cache.Put(key, val);
+            Assert.AreEqual(val, cache.Get(key));
+            Assert.IsTrue(cache.Replace(key, val2));
+            Assert.AreEqual(val2, cache.Get(key));
+
+            Assert.IsFalse(cache.Replace(key, -1, 3));
+            Assert.AreEqual(val2, cache.Get(key));
+
+            Assert.IsTrue(cache.Replace(key, val2, val3));
+            Assert.AreEqual(val3, cache.Get(key));
+        }
+
+        [Test]
         public void TestGetAndReplaceAsync()
         {
             var cache = Cache().WrapAsync();

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
new file mode 100644
index 0000000..76c57f6
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache.Configuration;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests binary object builder in thin client.
+    /// </summary>
+    public class BinaryBuilderTest : ClientTestBase
+    {
+        /// <summary>
+        /// Tests the classless object builder.
+        /// </summary>
+        [Test]
+        public void TestClasslessBuilder()
+        {
+            var bin = Client.GetBinary();
+
+            var obj = bin.GetBuilder("FooBarBaz")
+                .SetByteField("code", 99)
+                .SetStringField("name", "abc")
+                .Build();
+
+            var cache = GetBinaryCache();
+            cache[1] = obj;
+            var res = cache.Get(1);
+
+            Assert.AreEqual("abc", res.GetField<string>("name"));
+            Assert.AreEqual(99, res.GetField<byte>("code"));
+            Assert.IsNull(res.GetField<object>("field"));
+
+            var type = res.GetBinaryType();
+            Assert.AreEqual("FooBarBaz", type.TypeName);
+            Assert.IsFalse(type.IsEnum);
+
+            CollectionAssert.AreEquivalent(new[] { "code", "name" }, type.Fields);
+            Assert.AreEqual("byte", type.GetFieldTypeName("code"));
+            Assert.AreEqual("String", type.GetFieldTypeName("name"));
+
+            Assert.AreEqual(type.TypeId, bin.GetBinaryType("FooBarBaz").TypeId);
+            Assert.AreEqual(type.TypeName, bin.GetBinaryType(type.TypeId).TypeName);
+        }
+
+        /// <summary>
+        /// Tests the builder with existing class.
+        /// </summary>
+        [Test]
+        public void TestPersonBuilder()
+        {
+            var fullCache = GetCache<Person>();
+            var cache = GetBinaryCache();
+            cache[1] = GetBinaryPerson(1);
+
+            // Modify.
+            cache[1] = cache[1].ToBuilder().SetField("Name", "Baz").Build();
+            Assert.AreEqual("Baz", fullCache[1].Name);
+
+            // Build from scratch.
+            cache[2] = Client.GetBinary().GetBuilder(typeof(Person).FullName)
+                .SetIntField("Id", 25)
+                .SetStringField("Name", "Joe")
+                .Build();
+
+            Assert.AreEqual(25, fullCache[2].Id);
+            Assert.AreEqual("Joe", fullCache[2].Name);
+
+            // Meta.
+            Assert.AreEqual(cache[2].GetBinaryType().TypeId, Client.GetBinary().GetBinaryType(typeof(Person)).TypeId);
+        }
+
+        /// <summary>
+        /// Tests the enum builder.
+        /// </summary>
+        [Test]
+        public void TestEnumBuilder()
+        {
+            var bin = Client.GetBinary();
+            var cache = GetBinaryCache();
+
+            cache[1] = bin.BuildEnum(typeof(CacheMode), "Replicated");
+            Assert.AreEqual((int) CacheMode.Replicated, cache[1].EnumValue);
+
+            Assert.Throws<NotSupportedException>(() => bin.RegisterEnum("MyEnum", new Dictionary<string, int>
+            {
+                {"Foo", 1},
+                {"Bar", 2}
+            }));
+        }
+
+        /// <summary>
+        /// Tests binary types retrieval.
+        /// </summary>
+        [Test]
+        public void TestGetBinaryTypes()
+        {
+            Assert.Throws<NotSupportedException>(() => Client.GetBinary().GetBinaryTypes());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
index 106f448..0d82479 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
@@ -472,7 +472,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
         /// Tests the Replace overload with additional argument.
         /// </summary>
         [Test]
-        public void TestReplace2()
+        public void TestReplaceIfEquals()
         {
             using (var client = GetClient())
             {

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
new file mode 100644
index 0000000..21d3d7c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Apache.Ignite.Core.Binary;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Thin client cache test in binary mode.
+    /// </summary>
+    public sealed class CacheTestKeepBinary : ClientTestBase
+    {
+        /// <summary>
+        /// Tests the cache put / get with primitive data types.
+        /// </summary>
+        [Test]
+        public void TestPutGetPrimitives()
+        {
+            using (var client = GetClient())
+            {
+                GetCache<string>().Put(1, "foo");
+
+                var clientCache = client.GetCache<int?, string>(CacheName)
+                    .WithKeepBinary<int?, string>();
+
+                clientCache.Put(2, "bar");
+                clientCache[3] = "baz";
+
+                // Existing key.
+                Assert.AreEqual("foo", clientCache.Get(1));
+                Assert.AreEqual("foo", clientCache[1]);
+                Assert.AreEqual("bar", clientCache[2]);
+                Assert.AreEqual("baz", clientCache[3]);
+
+                // Missing key.
+                Assert.Throws<KeyNotFoundException>(() => clientCache.Get(-1));
+
+                // Null key.
+                Assert.Throws<ArgumentNullException>(() => clientCache.Get(null));
+
+                // Null vs 0.
+                var intCache = client.GetCache<int?, int?>(CacheName);
+                intCache.Put(1, 0);
+                Assert.AreEqual(0, intCache.Get(1));
+            }
+        }
+
+        /// <summary>
+        /// Tests the cache put / get for Empty object type.
+        /// </summary>
+        [Test]
+        public void TestPutGetEmptyObject()
+        {
+            using (var client = GetClient())
+            {
+                var serverCache = GetCache<EmptyObject>();
+                var clientCache = client.GetCache<int, EmptyObject>(CacheName)
+                    .WithKeepBinary<int, IBinaryObject>();
+
+                serverCache.Put(1, new EmptyObject());
+                
+                var res = clientCache.Get(1);
+                Assert.IsNotNull(res);
+                Assert.IsInstanceOf<EmptyObject>(res.Deserialize<object>());
+            }
+        }
+
+        /// <summary>
+        /// Tests the cache put / get with user data types.
+        /// </summary>
+        [Test]
+        public void TestPutGetUserObjects([Values(true, false)] bool compactFooter)
+        {
+            var cfg = GetClientConfiguration();
+
+            cfg.BinaryConfiguration = new BinaryConfiguration
+            {
+                CompactFooter = compactFooter
+            };
+
+            using (var client = Ignition.StartClient(cfg))
+            {
+                var person = new Person {Id = 100, Name = "foo"};
+                var person2 = new Person2 {Id = 200, Name = "bar"};
+
+                var serverCache = GetCache<Person>();
+                var clientCache = client.GetCache<int?, Person>(CacheName)
+                    .WithKeepBinary<int?, IBinaryObject>();
+
+                Assert.AreEqual(CacheName, clientCache.Name);
+
+                // Put through server cache.
+                serverCache.Put(1, person);
+
+                // Put through client cache.
+                var binPerson = client.GetBinary().ToBinary<IBinaryObject>(person2);
+                clientCache.Put(2, binPerson);
+
+                // Read from client cache.
+                Assert.AreEqual("foo", clientCache.Get(1).GetField<string>("Name"));
+                Assert.AreEqual(100, clientCache[1].GetField<int>("Id"));
+                Assert.AreEqual(200, clientCache[2].GetField<int>("Id"));
+
+                // Read from server cache.
+                Assert.AreEqual("foo", serverCache.Get(1).Name);
+                Assert.AreEqual(100, serverCache[1].Id);
+                Assert.AreEqual(200, serverCache[2].Id);
+
+                // Null key or value.
+                Assert.Throws<ArgumentNullException>(() => clientCache.Put(10, null));
+                Assert.Throws<ArgumentNullException>(() => clientCache.Put(null, binPerson));
+            }
+        }
+
+        /// <summary>
+        /// Tests the GetAll method.
+        /// </summary>
+        [Test]
+        public void TestGetAll()
+        {
+            var cache = GetBinaryCache();
+            cache[1] = GetBinaryPerson(1);
+            cache[2] = GetBinaryPerson(2);
+
+            var res = cache.GetAll(new [] {1}).Single();
+            Assert.AreEqual(1, res.Key);
+            Assert.AreEqual(1, res.Value.GetField<int>("Id"));
+
+            res = cache.GetAll(new [] {1, -1}).Single();
+            Assert.AreEqual(1, res.Key);
+            Assert.AreEqual(1, res.Value.GetField<int>("Id"));
+
+            CollectionAssert.AreEquivalent(new[] {1, 2}, 
+                cache.GetAll(new [] {1, 2, 3}).Select(x => x.Value.GetField<int>("Id")));
+        }
+
+        /// <summary>
+        /// Tests the GetAndPut method.
+        /// </summary>
+        [Test]
+        public void TestGetAndPut()
+        {
+            var cache = GetBinaryCache();
+
+            Assert.IsFalse(cache.ContainsKey(1));
+
+            var res = cache.GetAndPut(1, GetBinaryPerson(1));
+            Assert.IsFalse(res.Success);
+            Assert.IsNull(res.Value);
+
+            Assert.IsTrue(cache.ContainsKey(1));
+
+            res = cache.GetAndPut(1, GetBinaryPerson(2));
+            Assert.IsTrue(res.Success);
+            Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+            Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+        }
+
+        /// <summary>
+        /// Tests the GetAndReplace method.
+        /// </summary>
+        [Test]
+        public void TestGetAndReplace()
+        {
+            var cache = GetBinaryCache();
+
+            Assert.IsFalse(cache.ContainsKey(1));
+
+            var res = cache.GetAndReplace(1, GetBinaryPerson(1));
+            Assert.IsFalse(res.Success);
+            Assert.IsNull(res.Value);
+
+            Assert.IsFalse(cache.ContainsKey(1));
+            cache[1] = GetBinaryPerson(1);
+
+            res = cache.GetAndReplace(1, GetBinaryPerson(2));
+            Assert.IsTrue(res.Success);
+            Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+            Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+        }
+
+        /// <summary>
+        /// Tests the GetAndRemove method.
+        /// </summary>
+        [Test]
+        public void TestGetAndRemove()
+        {
+            var cache = GetBinaryCache();
+
+            Assert.IsFalse(cache.ContainsKey(1));
+
+            var res = cache.GetAndRemove(1);
+            Assert.IsFalse(res.Success);
+            Assert.IsNull(res.Value);
+
+            Assert.IsFalse(cache.ContainsKey(1));
+            cache[1] = GetBinaryPerson(1);
+
+            res = cache.GetAndRemove(1);
+            Assert.IsTrue(res.Success);
+            Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+            Assert.IsFalse(cache.ContainsKey(1));
+        }
+
+        /// <summary>
+        /// Tests the ContainsKey method.
+        /// </summary>
+        [Test]
+        public void TestContainsKey()
+        {
+            var cache = GetBinaryKeyCache();
+
+            cache[GetBinaryPerson(25)] = 1;
+
+            Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(25)));
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(26)));
+
+            Assert.Throws<ArgumentNullException>(() => cache.ContainsKey(null));
+        }
+
+        /// <summary>
+        /// Tests the ContainsKeys method.
+        /// </summary>
+        [Test]
+        public void TestContainsKeys()
+        {
+            var cache = GetBinaryKeyCache();
+
+            cache[GetBinaryPerson(1)] = 1;
+            cache[GetBinaryPerson(2)] = 2;
+            cache[GetBinaryPerson(3)] = 3;
+
+            Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1)}));
+            Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(2)}));
+            Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(2), GetBinaryPerson(1)}));
+            Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(2), GetBinaryPerson(3)}));
+            Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(3), GetBinaryPerson(2)}));
+
+            Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(0) }));
+            Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(0), GetBinaryPerson(1) }));
+            Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(1), GetBinaryPerson(0) }));
+            Assert.IsFalse(cache.ContainsKeys(new[]
+                {GetBinaryPerson(1), GetBinaryPerson(3), GetBinaryPerson(2), GetBinaryPerson(0)}));
+
+            Assert.Throws<ArgumentNullException>(() => cache.ContainsKeys(null));
+        }
+
+        /// <summary>
+        /// Tests the PutIfAbsent method.
+        /// </summary>
+        [Test]
+        public void TestPutIfAbsent()
+        {
+            var cache = GetBinaryKeyCache();
+
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+            var res = cache.PutIfAbsent(GetBinaryPerson(1), 1);
+            Assert.IsTrue(res);
+            Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+            res = cache.PutIfAbsent(GetBinaryPerson(1), 2);
+            Assert.IsFalse(res);
+            Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+            Assert.Throws<ArgumentNullException>(() => cache.PutIfAbsent(null, 1));
+        }
+
+        /// <summary>
+        /// Tests the GetAndPutIfAbsent method.
+        /// </summary>
+        [Test]
+        public void TestGetAndPutIfAbsent()
+        {
+            var cache = GetBinaryKeyCache();
+
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+            var res = cache.GetAndPutIfAbsent(GetBinaryPerson(1), 1);
+            Assert.IsFalse(res.Success);
+            Assert.AreEqual(0, res.Value);
+            Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+            res = cache.GetAndPutIfAbsent(GetBinaryPerson(1), 2);
+            Assert.IsTrue(res.Success);
+            Assert.AreEqual(1, res.Value);
+            Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+            Assert.Throws<ArgumentNullException>(() => cache.GetAndPutIfAbsent(null, 1));
+        }
+
+        /// <summary>
+        /// Tests the Replace method.
+        /// </summary>
+        [Test]
+        public void TestReplace()
+        {
+            var cache = GetBinaryKeyCache();
+            
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+            var res = cache.Replace(GetBinaryPerson(1), 1);
+            Assert.IsFalse(res);
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+            cache[GetBinaryPerson(1)] = 1;
+
+            res = cache.Replace(GetBinaryPerson(1), 2);
+            Assert.IsTrue(res);
+            Assert.AreEqual(2, cache[GetBinaryPerson(1)]);
+
+            Assert.Throws<ArgumentNullException>(() => cache.Replace(null, 1));
+        }
+
+        /// <summary>
+        /// Tests the Replace overload with additional argument.
+        /// </summary>
+        [Test]
+        [Ignore("IGNITE-7072")]
+        public void TestReplaceIfEquals()
+        {
+            var cache = GetBinaryCache();
+
+            Assert.IsFalse(cache.ContainsKey(1));
+
+            var res = cache.Replace(1, GetBinaryPerson(1), GetBinaryPerson(2));
+            Assert.IsFalse(res);
+            Assert.IsFalse(cache.ContainsKey(1));
+
+            cache[1] = GetBinaryPerson(1);
+
+            res = cache.Replace(1, GetBinaryPerson(-1), GetBinaryPerson(2));
+            Assert.IsFalse(res);
+            Assert.AreEqual(1, cache[1]);
+
+            res = cache.Replace(1, GetBinaryPerson(1), GetBinaryPerson(2));
+            Assert.IsTrue(res);
+            Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+        }
+
+        /// <summary>
+        /// Tests the PutAll method.
+        /// </summary>
+        [Test]
+        public void TestPutAll()
+        {
+            var cache = GetBinaryCache();
+
+            var keys = Enumerable.Range(1, 10).ToArray();
+            cache.PutAll(keys.ToDictionary(x => x, GetBinaryPerson));
+
+            Assert.AreEqual(keys, cache.GetAll(keys).Select(x => x.Value.GetField<int>("Id")));
+        }
+
+        /// <summary>
+        /// Tests the Clear method with a key argument.
+        /// </summary>
+        [Test]
+        public void TestClearKey()
+        {
+            var cache = GetBinaryKeyCache();
+
+            cache[GetBinaryPerson(1)] = 1;
+            cache[GetBinaryPerson(2)] = 2;
+
+            cache.Clear(GetBinaryPerson(1));
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+            Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(2)));
+
+            cache.Clear(GetBinaryPerson(2));
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(2)));
+        }
+
+        /// <summary>
+        /// Tests the Remove method.
+        /// </summary>
+        [Test]
+        public void TestRemove()
+        {
+            var cache = GetBinaryKeyCache();
+
+            cache[GetBinaryPerson(1)] = 1;
+            cache[GetBinaryPerson(2)] = 2;
+
+            var res = cache.Remove(GetBinaryPerson(1));
+            Assert.IsTrue(res);
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+            Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(2)));
+
+            res = cache.Remove(GetBinaryPerson(2));
+            Assert.IsTrue(res);
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+            Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(2)));
+
+            res = cache.Remove(GetBinaryPerson(-1));
+            Assert.IsFalse(res);
+
+            Assert.Throws<ArgumentNullException>(() => cache.Remove(null));
+        }
+
+        /// <summary>
+        /// Tests the Remove method with value argument.
+        /// </summary>
+        [Test]
+        [Ignore("IGNITE-7072")]
+        public void TestRemoveKeyVal()
+        {
+            var cache = GetBinaryKeyValCache();
+
+            var x = GetBinaryPerson(1);
+            var y = GetBinaryPerson(2);
+            var z = GetBinaryPerson(0);
+
+            cache[x] = x;
+            cache[y] = y;
+
+            var res = cache.Remove(x, z);
+            Assert.IsFalse(res);
+
+            res = cache.Remove(z, z);
+            Assert.IsFalse(res);
+
+            res = cache.Remove(x, x);
+            Assert.IsTrue(res);
+            Assert.IsFalse(cache.ContainsKey(x));
+            Assert.IsTrue(cache.ContainsKey(y));
+
+            res = cache.Remove(y, y);
+            Assert.IsTrue(res);
+            Assert.IsFalse(cache.ContainsKey(x));
+            Assert.IsFalse(cache.ContainsKey(y));
+
+            res = cache.Remove(y, y);
+            Assert.IsFalse(res);
+        }
+
+        /// <summary>
+        /// Tests the RemoveAll with a set of keys.
+        /// </summary>
+        [Test]
+        public void TestRemoveReys()
+        {
+            var cache = GetBinaryKeyCache();
+
+            var ids = Enumerable.Range(1, 10).ToArray();
+            var keys = ids.Select(GetBinaryPerson).ToArray();
+            cache.PutAll(ids.ToDictionary(GetBinaryPerson, x => x));
+
+            cache.RemoveAll(keys.Skip(2));
+            CollectionAssert.AreEquivalent(keys.Take(2), cache.GetAll(keys).Select(x => x.Key));
+
+            cache.RemoveAll(new[] {GetBinaryPerson(1)});
+            Assert.AreEqual(2, cache.GetAll(keys).Single().Value);
+
+            cache.RemoveAll(keys);
+            cache.RemoveAll(keys);
+
+            Assert.AreEqual(0, cache.GetSize());
+
+            Assert.Throws<ArgumentNullException>(() => cache.RemoveAll(null));
+        }
+
+        /// <summary>
+        /// Tests the WithKeepBinary logic.
+        /// </summary>
+        [Test]
+        public void TestWithKeepBinary()
+        {
+            var cache = GetBinaryCache();
+            var cache2 = cache.WithKeepBinary<int, IBinaryObject>();
+
+            Assert.AreSame(cache, cache2);
+
+            Assert.Throws<InvalidOperationException>(() => cache.WithKeepBinary<int, object>());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
index 0660a20..354b869 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Linq;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Query;
     using Apache.Ignite.Core.Client;
@@ -136,6 +137,26 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
             }
         }
 
+        /// <summary>
+        /// Tests scan query with .NET filter in binary mode.
+        /// </summary>
+        [Test]
+        public void TestWithFilterBinary()
+        {
+            GetPersonCache();
+
+            using (var client = GetClient())
+            {
+                var clientCache = client.GetCache<int, Person>(CacheName);
+                var binCache = clientCache.WithKeepBinary<int, IBinaryObject>();
+
+                // One result.
+                var single = binCache.Query(new ScanQuery<int, IBinaryObject>(new PersonIdFilterBinary(8))).Single();
+                Assert.AreEqual(8, single.Key);
+            }
+        }
+
+
 #if !NETCOREAPP2_0   // Serializing delegates and exceptions is not supported on this platform.
         /// <summary>
         /// Tests the exception in filter.
@@ -195,6 +216,10 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
                     Assert.IsTrue(cur2.MoveNext());
                     Assert.IsTrue(cur3.MoveNext());
 
+                    Assert.IsNotNull(cur1.Current);
+                    Assert.IsNotNull(cur2.Current);
+                    Assert.IsNotNull(cur3.Current);
+
                     Assert.AreEqual(cur1.Current.Key, cur2.Current.Key);
                     Assert.AreEqual(cur1.Current.Key, cur3.Current.Key);
                 }
@@ -289,5 +314,28 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
                 return entry.Key == _key;
             }
         }
+
+        /// <summary>
+        /// Person filter.
+        /// </summary>
+        private class PersonIdFilterBinary : ICacheEntryFilter<int, IBinaryObject>
+        {
+            /** Key. */
+            private readonly int _id;
+
+            /// <summary>
+            /// Initializes a new instance of the <see cref="PersonFilter"/> class.
+            /// </summary>
+            public PersonIdFilterBinary(int id)
+            {
+                _id = id;
+            }
+
+            /** <inheritdoc /> */
+            public bool Invoke(ICacheEntry<int, IBinaryObject> entry)
+            {
+                return entry.Value.GetField<int>("Id") == _id;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
index e1d30b9..6177f34 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
@@ -17,10 +17,13 @@
 
 namespace Apache.Ignite.Core.Tests.Client
 {
+    using System;
     using System.Net;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Client;
     using Apache.Ignite.Core.Client.Cache;
+    using Apache.Ignite.Core.Tests.Client.Cache;
     using NUnit.Framework;
 
     /// <summary>
@@ -134,5 +137,45 @@ namespace Apache.Ignite.Core.Tests.Client
         {
             return TestUtils.GetTestConfiguration();
         }
+
+        /// <summary>
+        /// Converts object to binary form.
+        /// </summary>
+        protected IBinaryObject ToBinary(object o)
+        {
+            return Client.GetBinary().ToBinary<IBinaryObject>(o);
+        }
+
+        /// <summary>
+        /// Gets the binary cache.
+        /// </summary>
+        protected ICacheClient<int, IBinaryObject> GetBinaryCache()
+        {
+            return Client.GetCache<int, Person>(CacheName).WithKeepBinary<int, IBinaryObject>();
+        }
+
+        /// <summary>
+        /// Gets the binary key cache.
+        /// </summary>
+        protected ICacheClient<IBinaryObject, int> GetBinaryKeyCache()
+        {
+            return Client.GetCache<Person, int>(CacheName).WithKeepBinary<IBinaryObject, int>();
+        }
+
+        /// <summary>
+        /// Gets the binary key-val cache.
+        /// </summary>
+        protected ICacheClient<IBinaryObject, IBinaryObject> GetBinaryKeyValCache()
+        {
+            return Client.GetCache<Person, Person>(CacheName).WithKeepBinary<IBinaryObject, IBinaryObject>();
+        }
+
+        /// <summary>
+        /// Gets the binary person.
+        /// </summary>
+        protected IBinaryObject GetBinaryPerson(int id)
+        {
+            return ToBinary(new Person(id) { DateTime = DateTime.MinValue.ToUniversalTime() });
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
index eb91b0a..47b780d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Core.Client.Cache
 {
+    using System;
     using System.Collections.Generic;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Query;
@@ -239,5 +240,15 @@ namespace Apache.Ignite.Core.Client.Cache
         /// Gets the cache configuration.
         /// </summary>
         CacheClientConfiguration GetConfiguration();
+
+        /// <summary>
+        /// Gets cache with KeepBinary mode enabled, changing key and/or value types if necessary.
+        /// You can only change key/value types when transitioning from non-binary to binary cache;
+        /// Changing type of binary cache is not allowed and will throw an <see cref="InvalidOperationException"/>.
+        /// </summary>
+        /// <typeparam name="TK1">Key type in binary mode.</typeparam>
+        /// <typeparam name="TV1">Value type in binary mode.</typeparam>
+        /// <returns>Cache instance with binary mode enabled.</returns>
+        ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index d5ba835..9bca883 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -19,6 +19,8 @@ namespace Apache.Ignite.Core.Client
 {
     using System;
     using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Client.Cache;
 
     /// <summary>
@@ -90,5 +92,12 @@ namespace Apache.Ignite.Core.Client
         /// </summary>
         /// <param name="name">The name of the cache to stop.</param>
         void DestroyCache(string name);
+
+        /// <summary>
+        /// Gets Ignite binary services.
+        /// </summary>
+        /// <returns>Instance of <see cref="IBinary"/> interface</returns>
+        [SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")]
+        IBinary GetBinary();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
index fac7d18..7d62f3a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
@@ -537,10 +537,7 @@ namespace Apache.Ignite.Core.Impl.Binary
                 // 3. Handle metadata.
                 if (metaHnd != null)
                 {
-                    IDictionary<string, BinaryField> meta = metaHnd.OnObjectWriteFinished();
-
-                    if (meta != null)
-                        _parent._ctx.Writer.SaveMetadata(desc, meta);
+                    _parent._ctx.Writer.SaveMetadata(desc, metaHnd.OnObjectWriteFinished());
                 }
             }
             finally

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
index 816e24a..febecd4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
@@ -17,7 +17,6 @@
 
 namespace Apache.Ignite.Core.Impl.Binary
 {
-    using System;
     using System.Collections.Generic;
     using System.Diagnostics;
     using Apache.Ignite.Core.Binary;
@@ -53,13 +52,13 @@ namespace Apache.Ignite.Core.Impl.Binary
         public BinaryType GetBinaryType(int typeId)
         {
             return _socket.DoOutInOp(ClientOp.BinaryTypeGet, s => s.WriteInt(typeId),
-                s => new BinaryType(_marsh.StartUnmarshal(s), true));
+                s => s.ReadBool() ? new BinaryType(_marsh.StartUnmarshal(s), true) : null);
         }
 
         /** <inheritdoc /> */
         public List<IBinaryType> GetBinaryTypes()
         {
-            throw new NotSupportedException();
+            throw IgniteClient.GetClientNotSupportedException();
         }
 
         /** <inheritdoc /> */
@@ -96,7 +95,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /** <inheritdoc /> */
         public BinaryType RegisterEnum(string typeName, IEnumerable<KeyValuePair<string, int>> values)
         {
-            throw new NotSupportedException();
+            throw IgniteClient.GetClientNotSupportedException();
         }
 
         /** <inheritdoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index 76c7b00..0ed2c6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -55,14 +55,15 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
         private readonly Marshaller _marsh;
 
         /** Keep binary flag. */
-        private readonly bool _keepBinary = false;
+        private readonly bool _keepBinary;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="CacheClient{TK, TV}" /> class.
         /// </summary>
         /// <param name="ignite">Ignite.</param>
         /// <param name="name">Cache name.</param>
-        public CacheClient(IgniteClient ignite, string name)
+        /// <param name="keepBinary">Binary mode flag.</param>
+        public CacheClient(IgniteClient ignite, string name, bool keepBinary = false)
         {
             Debug.Assert(ignite != null);
             Debug.Assert(name != null);
@@ -71,6 +72,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             _ignite = ignite;
             _marsh = _ignite.Marshaller;
             _id = BinaryUtils.GetCacheId(name);
+            _keepBinary = keepBinary;
         }
 
         /** <inheritDoc /> */
@@ -358,6 +360,26 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
             return DoOutInOp(ClientOp.CacheGetConfiguration, null, s => new CacheClientConfiguration(s));
         }
 
+        /** <inheritDoc /> */
+        public ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>()
+        {
+            if (_keepBinary)
+            {
+                var result = this as ICacheClient<TK1, TV1>;
+
+                if (result == null)
+                {
+                    throw new InvalidOperationException(
+                        "Can't change type of binary cache. WithKeepBinary has been called on an instance of " +
+                        "binary cache with incompatible generic arguments.");
+                }
+
+                return result;
+            }
+
+            return new CacheClient<TK1, TV1>(_ignite, _name, true);
+        }
+
         /// <summary>
         /// Does the out in op.
         /// </summary>
@@ -402,7 +424,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
 
             stream.Seek(-1, SeekOrigin.Current);
 
-            return _marsh.Unmarshal<T>(stream);
+            return _marsh.Unmarshal<T>(stream, _keepBinary);
         }
 
         /// <summary>
@@ -419,7 +441,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
 
             stream.Seek(-1, SeekOrigin.Current);
 
-            return new CacheResult<T>(_marsh.Unmarshal<T>(stream));
+            return new CacheResult<T>(_marsh.Unmarshal<T>(stream, _keepBinary));
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index 2dd18cc..10bf38f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -47,6 +47,9 @@ namespace Apache.Ignite.Core.Impl.Client
         /** Binary processor. */
         private readonly IBinaryProcessor _binProc;
 
+        /** Binary. */
+        private readonly IBinary _binary;
+
         /// <summary>
         /// Initializes a new instance of the <see cref="IgniteClient"/> class.
         /// </summary>
@@ -63,6 +66,8 @@ namespace Apache.Ignite.Core.Impl.Client
             };
 
             _binProc = clientConfiguration.BinaryProcessor ?? new BinaryProcessorClient(_socket);
+
+            _binary = new Binary(_marsh);
         }
 
         /// <summary>
@@ -154,7 +159,7 @@ namespace Apache.Ignite.Core.Impl.Client
         /** <inheritDoc /> */
         public IBinary GetBinary()
         {
-            throw GetClientNotSupportedException();
+            return _binary;
         }
 
         /** <inheritDoc /> */


[5/6] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-zk

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47971818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47971818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47971818

Branch: refs/heads/ignite-zk
Commit: 47971818e2e8dddb63e3714f1745246fc26dcbc3
Parents: c1e2033 35e621f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 11:59:58 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 11:59:58 2017 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           |  22 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |  15 +-
 .../processors/hadoop/HadoopJobInfo.java        |   7 +
 .../binary/ClientBinaryTypeGetRequest.java      |   4 +-
 .../binary/ClientBinaryTypeGetResponse.java     |  10 +-
 .../processors/hadoop/impl/HadoopUtils.java     |  45 +-
 .../impl/fs/HadoopFileSystemCacheUtils.java     |  34 +-
 .../hadoop/impl/proto/HadoopClientProtocol.java |   8 +-
 .../processors/hadoop/impl/v2/HadoopV2Job.java  |  32 +-
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |  62 ++-
 .../impl/HadoopAbstractMapReduceTest.java       |   2 +-
 .../hadoop/impl/HadoopGroupingTest.java         |   2 +-
 .../hadoop/impl/HadoopJobTrackerSelfTest.java   |   4 +-
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |   2 +-
 .../hadoop/impl/HadoopPlannerMockJob.java       |   6 +
 .../hadoop/impl/HadoopSortingTest.java          |   4 +-
 .../impl/HadoopTaskExecutionSelfTest.java       |  10 +-
 .../hadoop/impl/HadoopTasksV1Test.java          |   2 +-
 .../hadoop/impl/HadoopTasksV2Test.java          |   2 +-
 .../hadoop/impl/HadoopTeraSortTest.java         |   2 +-
 .../hadoop/impl/HadoopV2JobSelfTest.java        |   2 +-
 .../collections/HadoopAbstractMapTest.java      |   6 +
 .../HadoopExternalTaskExecutionSelfTest.java    |   4 +-
 .../Apache.Ignite.Core.Tests.DotNetCore.csproj  |   2 +
 .../Apache.Ignite.Core.Tests.csproj             |   2 +
 .../Cache/CacheAbstractTest.cs                  |  26 +
 .../Client/Cache/BinaryBuilderTest.cs           | 118 +++++
 .../Client/Cache/CacheTest.cs                   |   2 +-
 .../Client/Cache/CacheTestKeepBinary.cs         | 499 +++++++++++++++++++
 .../Client/Cache/ScanQueryTest.cs               |  48 ++
 .../Client/ClientTestBase.cs                    |  43 ++
 .../Client/Cache/ICacheClient.cs                |  11 +
 .../Apache.Ignite.Core/Client/IIgniteClient.cs  |   9 +
 .../Impl/Binary/BinaryObjectBuilder.cs          |   5 +-
 .../Impl/Binary/BinaryProcessorClient.cs        |   7 +-
 .../Impl/Client/Cache/CacheClient.cs            |  30 +-
 .../Impl/Client/IgniteClient.cs                 |   7 +-
 37 files changed, 1007 insertions(+), 89 deletions(-)
----------------------------------------------------------------------



[2/6] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fa9de3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fa9de3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fa9de3e

Branch: refs/heads/ignite-zk
Commit: 5fa9de3ef302937e2cea22a2f0f29c957e5c7313
Parents: 438760e 89c82f5
Author: devozerov <vo...@gridgain.com>
Authored: Wed Nov 29 13:50:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:33 2017 +0300

----------------------------------------------------------------------
 .../cache/binary/BinaryMetadataTransport.java   |  20 ++-
 .../continuous/GridContinuousProcessor.java     | 132 ++++++++++++-------
 2 files changed, 93 insertions(+), 59 deletions(-)
----------------------------------------------------------------------



[3/6] ignite git commit: IGNITE-7016 Avoid WAL segment fsync on header write in non-default mode

Posted by sb...@apache.org.
IGNITE-7016 Avoid WAL segment fsync on header write in non-default mode


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2b145a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2b145a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2b145a1

Branch: refs/heads/ignite-zk
Commit: c2b145a1b14160dcf645d7ceddb2b5d58b40b43a
Parents: 5fa9de3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Nov 29 19:13:20 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 29 19:13:20 2017 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b145a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a450521..948a8ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1304,16 +1304,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         synchronized (this) {
                             while (locked.containsKey(toArchive) && !stopped)
                                 wait();
+                        }
 
-                            // Firstly, format working file
-                            if (!stopped)
-                                formatFile(res.getOrigWorkFile());
+                        // Firstly, format working file
+                        if (!stopped)
+                            formatFile(res.getOrigWorkFile());
 
+                        synchronized (this) {
                             // Then increase counter to allow rollover on clean working file
                             changeLastArchivedIndexAndWakeupCompressor(toArchive);
 
                             notifyAll();
                         }
+
                         if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED))
                             evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
                                 res.getAbsIdx(), res.getDstArchiveFile()));
@@ -1369,7 +1372,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     return curAbsWalIdx;
                 }
             }
-           catch (InterruptedException e) {
+            catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
 
                 throw new IgniteInterruptedCheckedException(e);
@@ -1889,7 +1892,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @return I/O position after write version.
      * @throws IOException If failed to write serializer version.
      */
-    public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
+    public static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
         ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
 
         do {
@@ -1898,7 +1901,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         while (buffer.hasRemaining());
 
         // Flush
-        io.force();
+        if (mode == WALMode.DEFAULT)
+            io.force();
 
         return io.position();
     }
@@ -2205,9 +2209,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         public void writeSerializerVersion() throws IgniteCheckedException {
             try {
-                assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position();
+                assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " +
+                    fileIO.position();
 
-                long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version());
+                long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx,
+                    serializer.version(), mode);
 
                 written = updatedPosition;
                 lastFsyncPos = updatedPosition;


[6/6] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/827b7085
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/827b7085
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/827b7085

Branch: refs/heads/ignite-zk
Commit: 827b708545cf851f784998c5faef874bda8e0898
Parents: 4797181
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 15:23:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 16:47:05 2017 +0300

----------------------------------------------------------------------
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +
 .../discovery/zk/internal/ZookeeperClient.java  |  44 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 500 ++++++++-----------
 .../ZookeeperDiscoverySpiBasicTest.java         |  31 ++
 4 files changed, 287 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ed4c520..e7f86cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -440,6 +440,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
 
+            if (oldMeta == mergedMeta)
+                return;
+
             MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
 
             assert res != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index ea9b289..73547cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -521,6 +521,13 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     */
+    void deleteIfExistsAsync(String path) {
+        new DeleteIfExistsOperation(path).execute();
+    }
+
+    /**
+     * @param path Path.
      * @param watcher Watcher.
      * @param cb Callback.
      */
@@ -842,6 +849,43 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /**
+         * @param path Path.
+         */
+        DeleteIfExistsOperation(String path) {
+            this.path = path;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            zk.delete(path, -1, this, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx) {
+            if (rc == KeeperException.Code.NONODE.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" +
+                    "path=" + path + ']');
+
+                retryQ.add(this);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else
+                assert rc == 0 : rc;
+        }
+    }
+
+    /**
+     *
+     */
     class CreateCallbackWrapper implements AsyncCallback.StringCallback {
         /** */
         final CreateOperation op;

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 0ecbaf3..38f9cbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -27,9 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -127,16 +125,16 @@ public class ZookeeperDiscoveryImpl {
     private final int evtsAckThreshold;
 
     /** */
-    private ZkRuntimeState state = new ZkRuntimeState(false);
+    private ZkRuntimeState state;
 
     /** */
     private volatile ConnectionState connState = ConnectionState.STARTED;
 
     /** */
-    private ZkEventWorker evtWorker;
+    private final AtomicBoolean stop = new AtomicBoolean();
 
     /** */
-    private final AtomicBoolean stop = new AtomicBoolean();
+    private final Object stateMux = new Object();
 
     /**
      * @param log Logger.
@@ -213,7 +211,8 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
-        checkState();
+        while (!busyLock.enterBusy())
+            checkState();
 
         // TODO ZK
         return node(nodeId) != null;
@@ -225,14 +224,96 @@ public class ZookeeperDiscoveryImpl {
     public void reconnect() {
         assert clientReconnectEnabled;
 
-        evtWorker.onReconnectRequest();
+        synchronized (stateMux) {
+            if (connState == ConnectionState.STARTED)
+                connState = ConnectionState.DISCONNECTED;
+            else
+                return;
+        }
+
+        state.zkClient.onCloseStart();
+
+        busyLock.block();
+
+        busyLock.unblock();
+
+        state.zkClient.close();
+
+        UUID newId = UUID.randomUUID();
+
+        U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+            "newId=" + newId +
+            ", prevId=" + locNode.id() +
+            ", locNode=" + locNode + ']');
+
+        doReconnect(newId);
+    }
+
+    /**
+     * @param newId New ID.
+     */
+    private void doReconnect(UUID newId) {
+        locNode.onClientDisconnected(newId);
+
+        if (state.joined) {
+            assert state.evtsData != null;
+
+            lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                state.evtsData.topVer,
+                locNode,
+                state.top.topologySnapshot(),
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
+
+        try {
+            joinTopology0(state.joined);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to reconnect: " + e, e);
+
+            onSegmented(e);
+        }
+    }
+
+
+    /**
+     * @param e Error.
+     */
+    private void onSegmented(Exception e) {
+        if (state.joined) {
+            synchronized (stateMux) {
+                connState = ConnectionState.STOPPED;
+            }
+
+            zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null);
+        }
+        else
+            joinFut.onDone(e);
+    }
+
+    /**
+     *
+     */
+    class SegmentedWatcher implements AsyncCallback.VoidCallback {
+        @Override public void processResult(int rc, String path, Object ctx) {
+            assert state.evtsData != null;
+
+            lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+                state.evtsData.topVer,
+                locNode,
+                state.top.topologySnapshot(),
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
     }
 
     /**
      * @return Remote nodes.
      */
     public Collection<ClusterNode> remoteNodes() {
-        checkState();
+        while (!busyLock.enterBusy())
+            checkState();
 
         return state.top.remoteNodes();
     }
@@ -258,14 +339,9 @@ public class ZookeeperDiscoveryImpl {
      * @return {@code True} if node joined or joining topology.
      */
     public boolean knownNode(UUID nodeId) {
-        checkState();
-
-        if (!busyLock.enterBusy()) {
+        while (!busyLock.enterBusy())
             checkState();
 
-            throw new IgniteSpiException("Zookeeper client closed.");
-        }
-
         try {
             List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir);
 
@@ -295,8 +371,6 @@ public class ZookeeperDiscoveryImpl {
      * @param msg Message.
      */
     public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
-        checkState();
-
         assert msg != null;
 
         byte[] msgBytes;
@@ -308,12 +382,9 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
         }
 
-        if (!busyLock.enterBusy()) {
+        while (!busyLock.enterBusy())
             checkState();
 
-            throw new IgniteSpiException("Zookeeper client closed.");
-        }
-
         try {
             String prefix = UUID.randomUUID().toString();
 
@@ -367,7 +438,9 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @throws InterruptedException If interrupted.
      */
-    private void joinTopology0(boolean reconnect) throws InterruptedException {
+    private void joinTopology0(boolean prevJoined) throws InterruptedException {
+        state = new ZkRuntimeState(prevJoined);
+
         DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
 
         exchange.collect(discoDataBag);
@@ -394,14 +467,6 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to create Zookeeper client", e);
         }
 
-        if (!reconnect) {
-            evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log);
-
-            evtWorker.start();
-        }
-        else
-            assert evtWorker != null;
-
         startJoin(joinDataBytes);
     }
 
@@ -822,37 +887,32 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Delete join data: " + joinDataPath);
 
-        // TODO ZK async
-        state.zkClient.deleteIfExists(joinDataPath, -1);
+        state.zkClient.deleteIfExistsAsync(joinDataPath);
 
         final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                if (connState == ConnectionState.DISCONNECTED)
-                    connState = ConnectionState.STARTED;
-
-                lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
-                    1L,
-                    locNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-
-                if (state.prevJoined) {
-                    lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                        1L,
-                        locNode,
-                        topSnapshot,
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-
-                    U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
-                }
+        if (connState == ConnectionState.DISCONNECTED)
+            connState = ConnectionState.STARTED;
 
-                joinFut.onDone();
-            }
-        });
+        lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+            1L,
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        if (state.prevJoined) {
+            lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+                1L,
+                locNode,
+                topSnapshot,
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+
+            U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+        }
+
+        joinFut.onDone();
     }
 
     /**
@@ -948,8 +1008,8 @@ public class ZookeeperDiscoveryImpl {
 
                         state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
 
-                        if (log.isInfoEnabled())
-                            log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
@@ -958,7 +1018,7 @@ public class ZookeeperDiscoveryImpl {
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
 
-                    state.zkClient.deleteIfExists(evtDataPath, -1);
+                    state.zkClient.deleteIfExistsAsync(evtDataPath);
                 }
 
                 state.evtsData.procCustEvt = evtE.getKey();
@@ -1005,7 +1065,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
+    private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
         boolean updateNodeInfo = false;
@@ -1026,8 +1086,8 @@ public class ZookeeperDiscoveryImpl {
                     processLocalJoin(evtsData, evtData0);
             }
             else {
-                if (log.isInfoEnabled())
-                    log.info("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
+                if (log.isDebugEnabled())
+                    log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
 
                 switch (evtData.eventType()) {
                     case EventType.EVT_NODE_JOINED: {
@@ -1165,40 +1225,35 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                if (connState == ConnectionState.DISCONNECTED)
-                    connState = ConnectionState.STARTED;
-
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    locNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-
-                if (state.prevJoined) {
-                    lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                        evtData.topologyVersion(),
-                        locNode,
-                        topSnapshot,
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-
-                    U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
-                }
+        if (connState == ConnectionState.DISCONNECTED)
+            connState = ConnectionState.STARTED;
 
-                joinFut.onDone();
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        if (state.prevJoined) {
+            lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+                evtData.topologyVersion(),
+                locNode,
+                topSnapshot,
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+
+            U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+        }
+
+        joinFut.onDone();
 
         state.joined = true;
 
-        if (log.isInfoEnabled())
-            log.info("Delete data for joined: " + path);
+        if (log.isDebugEnabled())
+            log.debug("Delete data for joined: " + path);
 
-        // TODO ZK: async
-        state.zkClient.deleteIfExists(path, -1);
+        state.zkClient.deleteIfExistsAsync(path);
     }
 
     /**
@@ -1207,8 +1262,8 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
-        if (log.isInfoEnabled())
-            log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
+        if (log.isDebugEnabled())
+            log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
 
         final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId);
 
@@ -1216,16 +1271,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    sndNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    msg);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            sndNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            msg);
     }
 
     /**
@@ -1245,16 +1296,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    joinedNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            joinedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
     }
 
     /**
@@ -1268,16 +1315,12 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        evtWorker.evtsQ.add(new Runnable() {
-            @Override public void run() {
-                lsnr.onDiscovery(evtData.eventType(),
-                    evtData.topologyVersion(),
-                    failedNode,
-                    topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-        });
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            failedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
     }
 
     /**
@@ -1326,8 +1369,8 @@ public class ZookeeperDiscoveryImpl {
 
                             String path = zkPaths.ackEventDataPath(evtId);
 
-                            if (log.isInfoEnabled())
-                                log.info("Create ack event: " + path);
+                            if (log.isDebugEnabled())
+                                log.debug("Create ack event: " + path);
 
                             // TODO ZK: delete is previous exists?
                             state.zkClient.createIfNeeded(
@@ -1349,9 +1392,10 @@ public class ZookeeperDiscoveryImpl {
 
                             newEvts.add(ackEvtData);
 
-                            if (log.isInfoEnabled()) {
-                                log.info("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
+                            if (log.isDebugEnabled()) {
+                                log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
                                     ", evt=" + ackEvtData +
+                                    ", evtSize=" + ackBytes.length +
                                     ", msg=" + ack + ']');
                             }
                         }
@@ -1360,8 +1404,8 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     case EventType.EVT_NODE_FAILED: {
-                        if (log.isInfoEnabled())
-                            log.info("All nodes processed node fail [evtData=" + evtData + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("All nodes processed node fail [evtData=" + evtData + ']');
 
                         break; // Do not need addition cleanup.
                     }
@@ -1417,8 +1461,8 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
 
-        state.zkClient.deleteIfExists(evtDataPath, -1);
-        state.zkClient.deleteIfExists(dataForJoinedPath, -1);
+        state.zkClient.deleteIfExistsAsync(evtDataPath);
+        state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
     }
 
     /**
@@ -1429,15 +1473,16 @@ public class ZookeeperDiscoveryImpl {
     @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
         throws Exception
     {
-        if (log.isInfoEnabled())
-            log.info("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
+        if (log.isDebugEnabled())
+            log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
             String path = zkPaths.customEventDataPath(false, evtData.evtPath);
 
-            log.info("Delete path: " + path);
+            if (log.isDebugEnabled())
+                log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExists(path, -1);
+            state.zkClient.deleteIfExistsAsync(path);
 
             assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
 
@@ -1447,9 +1492,10 @@ public class ZookeeperDiscoveryImpl {
         else {
             String path = zkPaths.ackEventDataPath(evtData.eventId());
 
-            log.info("Delete path: " + path);
+            if (log.isDebugEnabled())
+                log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExists(path, -1);
+            state.zkClient.deleteIfExistsAsync(path);
         }
 
         return null;
@@ -1472,7 +1518,9 @@ public class ZookeeperDiscoveryImpl {
 
         log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']');
 
-        connState = ConnectionState.STOPPED;
+        synchronized (stateMux) {
+            connState = ConnectionState.STOPPED;
+        }
 
         ZookeeperClient zkClient = state.zkClient;
 
@@ -1481,18 +1529,12 @@ public class ZookeeperDiscoveryImpl {
 
         busyLock.block();
 
+        busyLock.unblock();
+
         joinFut.onDone(e);
 
         if (zkClient != null)
             zkClient.close();
-
-        ZkEventWorker evtWorker = this.evtWorker;
-
-        if (evtWorker != null) {
-            evtWorker.interrupt();
-
-            evtWorker.join();
-        }
     }
 
     /**
@@ -1552,165 +1594,27 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class ZkEventWorker extends IgniteSpiThread {
-        /** */
-        private final Runnable RECONNECT = new Runnable() {@Override public void run() {}};
-
-        /** */
-        private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}};
-
-        /** */
-        private final BlockingQueue<Runnable> evtsQ;
-
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param name Thread name.
-         * @param log Logger.
-         */
-        ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) {
-            super(igniteInstanceName, name, log);
-
-            evtsQ = new LinkedBlockingQueue<>();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                Runnable r = evtsQ.take();
-
-                if (r == RECONNECT)
-                    processReconnect();
-                if (r == CONNECTION_LOST)
-                    processConnectionLost();
-                else {
-                    if (!busyLock.enterBusy())
-                        return;
-
-                    try {
-                        r.run();
-                    }
-                    finally {
-                        busyLock.leaveBusy();
-                    }
-                }
-            }
-        }
-
+    private class ReconnectorThread extends IgniteSpiThread {
         /**
          *
          */
-        void onConnectionLoss() {
-            evtsQ.add(CONNECTION_LOST);
+        ReconnectorThread() {
+            super(ZookeeperDiscoveryImpl.this.igniteInstanceName, "zk-reconnector", log);
         }
 
-        /**
-         *
-         */
-        void onReconnectRequest() {
-            evtsQ.add(RECONNECT);
-        }
-
-        /**
-         *
-         */
-        private void processReconnect() {
-            assert locNode.isClient() : locNode;
-
-            if (connState == ConnectionState.DISCONNECTED)
-                return;
-
-            connState = ConnectionState.DISCONNECTED;
-
-            state.zkClient.onCloseStart();
-
+        @Override protected void body() throws InterruptedException {
             busyLock.block();
 
             busyLock.unblock();
 
-            state.zkClient.close();
-
             UUID newId = UUID.randomUUID();
 
-            U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+            U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
                 "newId=" + newId +
                 ", prevId=" + locNode.id() +
                 ", locNode=" + locNode + ']');
 
-            reconnect(newId);
-        }
-
-        /**
-         *
-         */
-        void processConnectionLost() {
-            if (clientReconnectEnabled) {
-                connState = ConnectionState.DISCONNECTED;
-
-                busyLock.block();
-
-                busyLock.unblock();
-
-                UUID newId = UUID.randomUUID();
-
-                U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
-                    "newId=" + newId +
-                    ", prevId=" + locNode.id() +
-                    ", locNode=" + locNode + ']');
-
-                reconnect(newId);
-            }
-            else {
-                U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
-
-                onSegmented(new IgniteSpiException("Zookeeper connection loss."));
-            }
-        }
-
-        /**
-         * @param newId New ID.
-         */
-        private void reconnect(UUID newId) {
-            locNode.onClientDisconnected(newId);
-
-            if (state.joined) {
-                assert state.evtsData != null;
-
-                lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
-                    state.evtsData.topVer,
-                    locNode,
-                    state.top.topologySnapshot(),
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-
-            state = new ZkRuntimeState(state.joined);
-
-            try {
-                joinTopology0(true);
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to reconnect: " + e, e);
-
-                onSegmented(e);
-            }
-        }
-
-        /**
-         * @param e Error.
-         */
-        private void onSegmented(Exception e) {
-            if (state.joined) {
-                assert state.evtsData != null;
-
-                lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-                    state.evtsData.topVer,
-                    locNode,
-                    state.top.topologySnapshot(),
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
-            }
-            else
-                joinFut.onDone(e);
+            doReconnect(newId);
         }
     }
 
@@ -1723,7 +1627,21 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void run() {
-            evtWorker.onConnectionLoss();
+            if (clientReconnectEnabled) {
+                synchronized (stateMux) {
+                    if (connState == ConnectionState.STARTED)
+                        connState = ConnectionState.DISCONNECTED;
+                    else
+                        return;
+                }
+
+                new ReconnectorThread().start();
+            }
+            else {
+                U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
+
+                onSegmented(new IgniteSpiException("Zookeeper connection loss."));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 8ae84c1..d50e9b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -306,6 +307,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testMetadataUpdate() throws Exception {
+        startGrid(0);
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite(0).configuration().getMarshaller().marshal(new C1());
+                ignite(0).configuration().getMarshaller().marshal(new C2());
+
+                return null;
+            }
+        }, 64, "marshal");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientNodesStatus() throws Exception {
         startGrid(0);
 
@@ -1661,4 +1678,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             return data;
         }
     }
+
+    /**
+     *
+     */
+    private static class C1 implements Serializable {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    private static class C2 implements Serializable {
+        // No-op.
+    }
 }