You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/30 13:47:37 UTC
[1/6] ignite git commit: IGNITE-6992: Hadoop: fixed MR problem with
HDFS access when Kerberos is enabled. This closes #3097.
Repository: ignite
Updated Branches:
refs/heads/ignite-zk c1e20330d -> 827b70854
IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/438760ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/438760ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/438760ed
Branch: refs/heads/ignite-zk
Commit: 438760ed7f9d37bb72de5e5a38d46ce2450544f8
Parents: 5fa5ae7
Author: Evgenii Zhuravlev <e....@gmail.com>
Authored: Wed Nov 29 13:50:13 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:13 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopDefaultJobInfo.java | 15 ++++-
.../processors/hadoop/HadoopJobInfo.java | 7 +++
.../processors/hadoop/impl/HadoopUtils.java | 45 +++++++++++++-
.../impl/fs/HadoopFileSystemCacheUtils.java | 34 ++++++-----
.../hadoop/impl/proto/HadoopClientProtocol.java | 8 ++-
.../processors/hadoop/impl/v2/HadoopV2Job.java | 32 ++++++++--
.../hadoop/impl/v2/HadoopV2TaskContext.java | 62 +++++++++++++-------
.../impl/HadoopAbstractMapReduceTest.java | 2 +-
.../hadoop/impl/HadoopGroupingTest.java | 2 +-
.../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +-
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 2 +-
.../hadoop/impl/HadoopPlannerMockJob.java | 6 ++
.../hadoop/impl/HadoopSortingTest.java | 4 +-
.../impl/HadoopTaskExecutionSelfTest.java | 10 ++--
.../hadoop/impl/HadoopTasksV1Test.java | 2 +-
.../hadoop/impl/HadoopTasksV2Test.java | 2 +-
.../hadoop/impl/HadoopTeraSortTest.java | 2 +-
.../hadoop/impl/HadoopV2JobSelfTest.java | 2 +-
.../collections/HadoopAbstractMapTest.java | 6 ++
.../HadoopExternalTaskExecutionSelfTest.java | 4 +-
20 files changed, 189 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d4a29b2..a66f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -52,6 +52,9 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
/** User name. */
private String user;
+ /** Credentials. */
+ private byte[] credentials;
+
/**
* Default constructor required by {@link Externalizable}.
*/
@@ -69,12 +72,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
* @param props All other properties of the job.
*/
public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
- Map<String, String> props) {
+ Map<String, String> props, byte[] credentials) {
this.jobName = jobName;
this.user = user;
this.hasCombiner = hasCombiner;
this.numReduces = numReduces;
this.props = props;
+ this.credentials = credentials;
}
/** {@inheritDoc} */
@@ -127,6 +131,11 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
+ @Override public byte[] credentials() {
+ return credentials;
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, jobName);
U.writeString(out, user);
@@ -135,6 +144,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
out.writeInt(numReduces);
IgfsUtils.writeStringMap(out, props);
+
+ U.writeByteArray(out, credentials);
}
/** {@inheritDoc} */
@@ -146,6 +157,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
numReduces = in.readInt();
props = IgfsUtils.readStringMap(in);
+
+ credentials = U.readByteArray(in);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 4cc8f80..3dffbc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -69,6 +69,13 @@ public interface HadoopJobInfo {
String user();
/**
+ * Gets credentials.
+ *
+ * @return Credentials.
+ */
+ byte[] credentials();
+
+ /**
* Creates new job instance for the given ID.
* {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
* This method will be called once for the same ID on one node, though it can be called on the same host
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index 767e10a..89c60b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.impl;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
+import java.io.DataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -27,6 +28,8 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
@@ -211,10 +214,12 @@ public class HadoopUtils {
* Creates JobInfo from hadoop configuration.
*
* @param cfg Hadoop configuration.
+ * @param credentials Credentials.
* @return Job info.
* @throws IgniteCheckedException If failed.
*/
- public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+ public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials)
+ throws IgniteCheckedException {
JobConf jobConf = new JobConf(cfg);
boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
@@ -269,7 +274,8 @@ public class HadoopUtils {
for (Map.Entry<String, String> entry : jobConf)
props.put(entry.getKey(), entry.getValue());
- return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+ return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props,
+ credentials);
}
/**
@@ -394,4 +400,39 @@ public class HadoopUtils {
return len1 - len2;
}
+
+ /**
+ * Deserialization of Hadoop Writable object.
+ *
+ * @param writable Writable object to deserialize to.
+ * @param bytes byte array to deserialize.
+ */
+ public static void deserialize(Writable writable, byte[] bytes) throws IOException {
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
+
+ writable.readFields(dataIn);
+
+ dataIn.close();
+ }
+
+ /**
+ * Create UserGroupInformation for specified user and credentials.
+ *
+ * @param user User.
+ * @param credentialsBytes Credentials byte array.
+ */
+ public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException {
+ Credentials credentials = new Credentials();
+
+ HadoopUtils.deserialize(credentials, credentialsBytes);
+
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+
+ ugi.addCredentials(credentials);
+
+ if (credentials.numberOfTokens() > 0)
+ ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN);
+
+ return ugi;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
index 0b673e9..f48d21d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteException;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -41,27 +42,32 @@ public class HadoopFileSystemCacheUtils {
return new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
@Override public FileSystem createValue(FsCacheKey key) throws IOException {
- try {
- assert key != null;
+ assert key != null;
- // Explicitly disable FileSystem caching:
- URI uri = key.uri();
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
- String scheme = uri.getScheme();
+ String scheme = uri.getScheme();
- // Copy the configuration to avoid altering the external object.
- Configuration cfg = new Configuration(key.configuration());
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+ String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
- cfg.setBoolean(prop, true);
+ cfg.setBoolean(prop, true);
- return FileSystem.get(uri, cfg, key.user());
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() ==
+ UserGroupInformation.AuthenticationMethod.TOKEN)
+ return FileSystem.get(uri, cfg);
+ else {
+ try {
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- throw new IOException("Failed to create file system due to interrupt.", e);
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index 7fc0e77..811b0c2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -122,8 +123,13 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
+ byte[] credentials = null;
+
+ if (ts != null)
+ credentials = WritableUtils.toByteArray(ts);
+
HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
- jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
+ jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials));
if (status == null)
throw new IOException("Failed to submit job (null status obtained): " + jobId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 2a85cb8..28b4d6b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.hadoop.impl.v2;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter;
@@ -317,7 +320,7 @@ public class HadoopV2Job extends HadoopJobEx {
}
/** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+ @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException {
assert locNodeId != null;
this.locNodeId = locNodeId;
@@ -325,15 +328,36 @@ public class HadoopV2Job extends HadoopJobEx {
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
try {
- rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+ if (jobInfo.credentials() == null)
+ rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+ else {
+ UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials());
+
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override public Void run() throws Exception {
+ rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId,
+ jobId));
+
+ return null;
+ }
+ });
+ }
+ catch (IOException | InterruptedException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) {
- U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + " job property is set to true; please disable " +
- "it if job tasks rely on mutable static state.");
+ U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() +
+ " job property is set to true; please disable " + "it if job tasks rely on mutable static state.");
sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
}
}
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 6127822..c362b0c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
@@ -548,41 +549,58 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
/** {@inheritDoc} */
@Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
- String user = job.info().user();
+ if (job.info().credentials() == null) {
+ String user = job.info().user();
- user = IgfsUtils.fixUserName(user);
+ user = IgfsUtils.fixUserName(user);
- assert user != null;
+ assert user != null;
- String ugiUser;
+ String ugiUser;
- try {
- UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
- assert currUser != null;
+ assert currUser != null;
- ugiUser = currUser.getShortUserName();
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe);
- }
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
- try {
- if (F.eq(user, ugiUser))
- // if current UGI context user is the same, do direct call:
- return c.call();
- else {
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ else {
+ try {
+ UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials());
return ugi.doAs(new PrivilegedExceptionAction<T>() {
- @Override public T run() throws Exception {
+ @Override
+ public T run() throws Exception {
return c.call();
}
});
}
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 4928e3d..fc6d7f8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -207,7 +207,7 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
index 2de2d19..d27a234 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -127,7 +127,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest {
}
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(30000);
+ createJobInfo(job.getConfiguration(), null)).get(30000);
assertTrue(HadoopGroupingTestState.values().isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index 381652e..c3b3040 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -121,7 +121,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(globalId, 1);
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
checkStatus(jobId, false);
@@ -168,7 +168,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(globalId, 1);
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
checkStatus(jobId, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 6eb16af..21b7ee2 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
job.setJarByClass(HadoopWordCount2.class);
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
index 28c8264..b3368bd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -178,5 +178,11 @@ public class HadoopPlannerMockJob extends HadoopJobEx {
return null;
}
+
+ @Override public byte[] credentials() {
+ throwUnsupported();
+
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
index 2e85cce..bb11ccb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -117,7 +117,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
X.printerrln("Data generation started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration())).get(180000);
+ createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Data generation complete.");
@@ -148,7 +148,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
X.printerrln("Job started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(180000);
+ createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Job complete.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index c27a67f..2394ada 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
@@ -188,7 +188,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
@@ -226,7 +226,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
job.setJarByClass(getClass());
final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -313,7 +313,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+ final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg, null));
if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -364,7 +364,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
assertFalse(killRes);
- final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+ final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg, null));
if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
index 1d7f3e4..ca96551 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
@@ -46,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
setupFileSystems(jobConf);
- HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+ HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf, null);
UUID uuid = new UUID(0, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
index 61e3e46..0fcd358 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
@@ -65,7 +65,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
- HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+ HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration(), null);
UUID uuid = new UUID(0, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index d8b74ce..46752a8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -178,7 +178,7 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
index 2c2f049..041f0bc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -80,7 +80,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
cfg.setMapOutputValueClass(Text.class);
cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
- HadoopDefaultJobInfo info = createJobInfo(cfg);
+ HadoopDefaultJobInfo info = createJobInfo(cfg, null);
final UUID uuid = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index b9dcae1..49be0a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -177,5 +177,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
return null;
}
+
+ @Override public byte[] credentials() {
+ assert false;
+
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 0afd689..1246078 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -117,7 +117,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
}
@@ -153,7 +153,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
try {
fut.get();
[4/6] ignite git commit: IGNITE-6335 .NET: Thin client: cache binary
mode
Posted by sb...@apache.org.
IGNITE-6335 .NET: Thin client: cache binary mode
This closes #3114
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35e621fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35e621fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35e621fe
Branch: refs/heads/ignite-zk
Commit: 35e621fec2c2bc43206ec1f35a7de4ef7faad97f
Parents: c2b145a
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Nov 30 10:10:12 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Nov 30 10:10:12 2017 +0300
----------------------------------------------------------------------
.../binary/ClientBinaryTypeGetRequest.java | 4 +-
.../binary/ClientBinaryTypeGetResponse.java | 10 +-
.../Apache.Ignite.Core.Tests.DotNetCore.csproj | 2 +
.../Apache.Ignite.Core.Tests.csproj | 2 +
.../Cache/CacheAbstractTest.cs | 26 +
.../Client/Cache/BinaryBuilderTest.cs | 118 +++++
.../Client/Cache/CacheTest.cs | 2 +-
.../Client/Cache/CacheTestKeepBinary.cs | 499 +++++++++++++++++++
.../Client/Cache/ScanQueryTest.cs | 48 ++
.../Client/ClientTestBase.cs | 43 ++
.../Client/Cache/ICacheClient.cs | 11 +
.../Apache.Ignite.Core/Client/IIgniteClient.cs | 9 +
.../Impl/Binary/BinaryObjectBuilder.cs | 5 +-
.../Impl/Binary/BinaryProcessorClient.cs | 7 +-
.../Impl/Client/Cache/CacheClient.cs | 30 +-
.../Impl/Client/IgniteClient.cs | 7 +-
16 files changed, 804 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
index 72f9f58..06cb52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetRequest.java
@@ -43,8 +43,8 @@ public class ClientBinaryTypeGetRequest extends ClientRequest {
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
- BinaryTypeImpl type = (BinaryTypeImpl)ctx.kernalContext().cacheObjects().binary().type( typeId);
+ BinaryTypeImpl type = (BinaryTypeImpl)ctx.kernalContext().cacheObjects().binary().type(typeId);
- return new ClientBinaryTypeGetResponse(requestId(), type.metadata());
+ return new ClientBinaryTypeGetResponse(requestId(), type != null ? type.metadata() : null);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
index e888305..5a47c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeGetResponse.java
@@ -37,8 +37,6 @@ public class ClientBinaryTypeGetResponse extends ClientResponse {
ClientBinaryTypeGetResponse(long requestId, BinaryMetadata meta) {
super(requestId);
- assert meta != null;
-
this.meta = meta;
}
@@ -46,6 +44,12 @@ public class ClientBinaryTypeGetResponse extends ClientResponse {
@Override public void encode(BinaryRawWriterEx writer) {
super.encode(writer);
- PlatformUtils.writeBinaryMetadata(writer, meta, true);
+ if (meta != null) {
+ writer.writeBoolean(true); // Not null.
+
+ PlatformUtils.writeBinaryMetadata(writer, meta, true);
+ } else {
+ writer.writeBoolean(false); // Null.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
index 0acaab8..5d735eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Apache.Ignite.Core.Tests.DotNetCore.csproj
@@ -58,7 +58,9 @@
<Compile Include="..\Apache.Ignite.Core.Tests\Cache\Query\Linq\CacheLinqTestSimpleName.cs" Link="Cache\Query\Linq\CacheLinqTestSimpleName.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Cache\Query\Linq\CacheLinqTestSqlEscapeAll.cs" Link="Cache\Query\Linq\CacheLinqTestSqlEscapeAll.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Cache\TestReferenceObject.cs" Link="Cache\TestReferenceObject.cs" />
+ <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\BinaryBuilderTest.cs" Link="ThinClient\Cache\BinaryBuilderTest.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\CacheTest.cs" Link="ThinClient\Cache\CacheTest.cs" />
+ <Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\CacheTestKeepBinary.cs" Link="ThinClient\Cache\CacheTestKeepBinary.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\EmptyObject.cs" Link="ThinClient\Cache\EmptyObject.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\Person.cs" Link="ThinClient\Cache\Person.cs" />
<Compile Include="..\Apache.Ignite.Core.Tests\Client\Cache\ScanQueryTest.cs" Link="ThinClient\Cache\ScanQueryTest.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 3951b40..8bd8f28 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -106,7 +106,9 @@
<Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" />
<Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
<Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
+ <Compile Include="Client\Cache\BinaryBuilderTest.cs" />
<Compile Include="Client\Cache\CacheTest.cs" />
+ <Compile Include="Client\Cache\CacheTestKeepBinary.cs" />
<Compile Include="Client\Cache\CacheTestNoMeta.cs" />
<Compile Include="Client\Cache\ClientCacheConfigurationTest.cs" />
<Compile Include="Client\Cache\EmptyObject.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
index d54363a..fcce021 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
@@ -515,6 +515,32 @@ namespace Apache.Ignite.Core.Tests.Cache
}
[Test]
+ [Ignore("IGNITE-7072")]
+ public void TestReplaceBinary()
+ {
+ var cache = Cache<object, object>();
+ var key = new {Foo = "bar"};
+ var val = new {Bar = "baz", Id = 1};
+ var val2 = new {Bar = "baz2", Id = 2};
+ var val3 = new {Bar = "baz3", Id = 3};
+
+ Assert.IsFalse(cache.ContainsKey(key));
+ Assert.AreEqual(false, cache.Replace(key, val));
+ Assert.IsFalse(cache.ContainsKey(key));
+
+ cache.Put(key, val);
+ Assert.AreEqual(val, cache.Get(key));
+ Assert.IsTrue(cache.Replace(key, val2));
+ Assert.AreEqual(val2, cache.Get(key));
+
+ Assert.IsFalse(cache.Replace(key, -1, 3));
+ Assert.AreEqual(val2, cache.Get(key));
+
+ Assert.IsTrue(cache.Replace(key, val2, val3));
+ Assert.AreEqual(val3, cache.Get(key));
+ }
+
+ [Test]
public void TestGetAndReplaceAsync()
{
var cache = Cache().WrapAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
new file mode 100644
index 0000000..76c57f6
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/BinaryBuilderTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests binary object builder in thin client.
+ /// </summary>
+ public class BinaryBuilderTest : ClientTestBase
+ {
+ /// <summary>
+ /// Tests the classless object builder.
+ /// </summary>
+ [Test]
+ public void TestClasslessBuilder()
+ {
+ var bin = Client.GetBinary();
+
+ var obj = bin.GetBuilder("FooBarBaz")
+ .SetByteField("code", 99)
+ .SetStringField("name", "abc")
+ .Build();
+
+ var cache = GetBinaryCache();
+ cache[1] = obj;
+ var res = cache.Get(1);
+
+ Assert.AreEqual("abc", res.GetField<string>("name"));
+ Assert.AreEqual(99, res.GetField<byte>("code"));
+ Assert.IsNull(res.GetField<object>("field"));
+
+ var type = res.GetBinaryType();
+ Assert.AreEqual("FooBarBaz", type.TypeName);
+ Assert.IsFalse(type.IsEnum);
+
+ CollectionAssert.AreEquivalent(new[] { "code", "name" }, type.Fields);
+ Assert.AreEqual("byte", type.GetFieldTypeName("code"));
+ Assert.AreEqual("String", type.GetFieldTypeName("name"));
+
+ Assert.AreEqual(type.TypeId, bin.GetBinaryType("FooBarBaz").TypeId);
+ Assert.AreEqual(type.TypeName, bin.GetBinaryType(type.TypeId).TypeName);
+ }
+
+ /// <summary>
+ /// Tests the builder with existing class.
+ /// </summary>
+ [Test]
+ public void TestPersonBuilder()
+ {
+ var fullCache = GetCache<Person>();
+ var cache = GetBinaryCache();
+ cache[1] = GetBinaryPerson(1);
+
+ // Modify.
+ cache[1] = cache[1].ToBuilder().SetField("Name", "Baz").Build();
+ Assert.AreEqual("Baz", fullCache[1].Name);
+
+ // Build from scratch.
+ cache[2] = Client.GetBinary().GetBuilder(typeof(Person).FullName)
+ .SetIntField("Id", 25)
+ .SetStringField("Name", "Joe")
+ .Build();
+
+ Assert.AreEqual(25, fullCache[2].Id);
+ Assert.AreEqual("Joe", fullCache[2].Name);
+
+ // Meta.
+ Assert.AreEqual(cache[2].GetBinaryType().TypeId, Client.GetBinary().GetBinaryType(typeof(Person)).TypeId);
+ }
+
+ /// <summary>
+ /// Tests the enum builder.
+ /// </summary>
+ [Test]
+ public void TestEnumBuilder()
+ {
+ var bin = Client.GetBinary();
+ var cache = GetBinaryCache();
+
+ cache[1] = bin.BuildEnum(typeof(CacheMode), "Replicated");
+ Assert.AreEqual((int) CacheMode.Replicated, cache[1].EnumValue);
+
+ Assert.Throws<NotSupportedException>(() => bin.RegisterEnum("MyEnum", new Dictionary<string, int>
+ {
+ {"Foo", 1},
+ {"Bar", 2}
+ }));
+ }
+
+ /// <summary>
+ /// Tests binary types retrieval.
+ /// </summary>
+ [Test]
+ public void TestGetBinaryTypes()
+ {
+ Assert.Throws<NotSupportedException>(() => Client.GetBinary().GetBinaryTypes());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
index 106f448..0d82479 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
@@ -472,7 +472,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
/// Tests the Replace overload with additional argument.
/// </summary>
[Test]
- public void TestReplace2()
+ public void TestReplaceIfEquals()
{
using (var client = GetClient())
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
new file mode 100644
index 0000000..21d3d7c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestKeepBinary.cs
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Binary;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Thin client cache test in binary mode.
+ /// </summary>
+ public sealed class CacheTestKeepBinary : ClientTestBase
+ {
+ /// <summary>
+ /// Tests the cache put / get with primitive data types.
+ /// </summary>
+ [Test]
+ public void TestPutGetPrimitives()
+ {
+ using (var client = GetClient())
+ {
+ GetCache<string>().Put(1, "foo");
+
+ var clientCache = client.GetCache<int?, string>(CacheName)
+ .WithKeepBinary<int?, string>();
+
+ clientCache.Put(2, "bar");
+ clientCache[3] = "baz";
+
+ // Existing key.
+ Assert.AreEqual("foo", clientCache.Get(1));
+ Assert.AreEqual("foo", clientCache[1]);
+ Assert.AreEqual("bar", clientCache[2]);
+ Assert.AreEqual("baz", clientCache[3]);
+
+ // Missing key.
+ Assert.Throws<KeyNotFoundException>(() => clientCache.Get(-1));
+
+ // Null key.
+ Assert.Throws<ArgumentNullException>(() => clientCache.Get(null));
+
+ // Null vs 0.
+ var intCache = client.GetCache<int?, int?>(CacheName);
+ intCache.Put(1, 0);
+ Assert.AreEqual(0, intCache.Get(1));
+ }
+ }
+
+ /// <summary>
+ /// Tests the cache put / get for Empty object type.
+ /// </summary>
+ [Test]
+ public void TestPutGetEmptyObject()
+ {
+ using (var client = GetClient())
+ {
+ var serverCache = GetCache<EmptyObject>();
+ var clientCache = client.GetCache<int, EmptyObject>(CacheName)
+ .WithKeepBinary<int, IBinaryObject>();
+
+ serverCache.Put(1, new EmptyObject());
+
+ var res = clientCache.Get(1);
+ Assert.IsNotNull(res);
+ Assert.IsInstanceOf<EmptyObject>(res.Deserialize<object>());
+ }
+ }
+
+ /// <summary>
+ /// Tests the cache put / get with user data types.
+ /// </summary>
+ [Test]
+ public void TestPutGetUserObjects([Values(true, false)] bool compactFooter)
+ {
+ var cfg = GetClientConfiguration();
+
+ cfg.BinaryConfiguration = new BinaryConfiguration
+ {
+ CompactFooter = compactFooter
+ };
+
+ using (var client = Ignition.StartClient(cfg))
+ {
+ var person = new Person {Id = 100, Name = "foo"};
+ var person2 = new Person2 {Id = 200, Name = "bar"};
+
+ var serverCache = GetCache<Person>();
+ var clientCache = client.GetCache<int?, Person>(CacheName)
+ .WithKeepBinary<int?, IBinaryObject>();
+
+ Assert.AreEqual(CacheName, clientCache.Name);
+
+ // Put through server cache.
+ serverCache.Put(1, person);
+
+ // Put through client cache.
+ var binPerson = client.GetBinary().ToBinary<IBinaryObject>(person2);
+ clientCache.Put(2, binPerson);
+
+ // Read from client cache.
+ Assert.AreEqual("foo", clientCache.Get(1).GetField<string>("Name"));
+ Assert.AreEqual(100, clientCache[1].GetField<int>("Id"));
+ Assert.AreEqual(200, clientCache[2].GetField<int>("Id"));
+
+ // Read from server cache.
+ Assert.AreEqual("foo", serverCache.Get(1).Name);
+ Assert.AreEqual(100, serverCache[1].Id);
+ Assert.AreEqual(200, serverCache[2].Id);
+
+ // Null key or value.
+ Assert.Throws<ArgumentNullException>(() => clientCache.Put(10, null));
+ Assert.Throws<ArgumentNullException>(() => clientCache.Put(null, binPerson));
+ }
+ }
+
+ /// <summary>
+ /// Tests the GetAll method.
+ /// </summary>
+ [Test]
+ public void TestGetAll()
+ {
+ var cache = GetBinaryCache();
+ cache[1] = GetBinaryPerson(1);
+ cache[2] = GetBinaryPerson(2);
+
+ var res = cache.GetAll(new [] {1}).Single();
+ Assert.AreEqual(1, res.Key);
+ Assert.AreEqual(1, res.Value.GetField<int>("Id"));
+
+ res = cache.GetAll(new [] {1, -1}).Single();
+ Assert.AreEqual(1, res.Key);
+ Assert.AreEqual(1, res.Value.GetField<int>("Id"));
+
+ CollectionAssert.AreEquivalent(new[] {1, 2},
+ cache.GetAll(new [] {1, 2, 3}).Select(x => x.Value.GetField<int>("Id")));
+ }
+
+ /// <summary>
+ /// Tests the GetAndPut method.
+ /// </summary>
+ [Test]
+ public void TestGetAndPut()
+ {
+ var cache = GetBinaryCache();
+
+ Assert.IsFalse(cache.ContainsKey(1));
+
+ var res = cache.GetAndPut(1, GetBinaryPerson(1));
+ Assert.IsFalse(res.Success);
+ Assert.IsNull(res.Value);
+
+ Assert.IsTrue(cache.ContainsKey(1));
+
+ res = cache.GetAndPut(1, GetBinaryPerson(2));
+ Assert.IsTrue(res.Success);
+ Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+ Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+ }
+
+ /// <summary>
+ /// Tests the GetAndReplace method.
+ /// </summary>
+ [Test]
+ public void TestGetAndReplace()
+ {
+ var cache = GetBinaryCache();
+
+ Assert.IsFalse(cache.ContainsKey(1));
+
+ var res = cache.GetAndReplace(1, GetBinaryPerson(1));
+ Assert.IsFalse(res.Success);
+ Assert.IsNull(res.Value);
+
+ Assert.IsFalse(cache.ContainsKey(1));
+ cache[1] = GetBinaryPerson(1);
+
+ res = cache.GetAndReplace(1, GetBinaryPerson(2));
+ Assert.IsTrue(res.Success);
+ Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+ Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+ }
+
+ /// <summary>
+ /// Tests the GetAndRemove method.
+ /// </summary>
+ [Test]
+ public void TestGetAndRemove()
+ {
+ var cache = GetBinaryCache();
+
+ Assert.IsFalse(cache.ContainsKey(1));
+
+ var res = cache.GetAndRemove(1);
+ Assert.IsFalse(res.Success);
+ Assert.IsNull(res.Value);
+
+ Assert.IsFalse(cache.ContainsKey(1));
+ cache[1] = GetBinaryPerson(1);
+
+ res = cache.GetAndRemove(1);
+ Assert.IsTrue(res.Success);
+ Assert.AreEqual(GetBinaryPerson(1), res.Value);
+
+ Assert.IsFalse(cache.ContainsKey(1));
+ }
+
+ /// <summary>
+ /// Tests the ContainsKey method.
+ /// </summary>
+ [Test]
+ public void TestContainsKey()
+ {
+ var cache = GetBinaryKeyCache();
+
+ cache[GetBinaryPerson(25)] = 1;
+
+ Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(25)));
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(26)));
+
+ Assert.Throws<ArgumentNullException>(() => cache.ContainsKey(null));
+ }
+
+ /// <summary>
+ /// Tests the ContainsKeys method.
+ /// </summary>
+ [Test]
+ public void TestContainsKeys()
+ {
+ var cache = GetBinaryKeyCache();
+
+ cache[GetBinaryPerson(1)] = 1;
+ cache[GetBinaryPerson(2)] = 2;
+ cache[GetBinaryPerson(3)] = 3;
+
+ Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1)}));
+ Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(2)}));
+ Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(2), GetBinaryPerson(1)}));
+ Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(2), GetBinaryPerson(3)}));
+ Assert.IsTrue(cache.ContainsKeys(new[] {GetBinaryPerson(1), GetBinaryPerson(3), GetBinaryPerson(2)}));
+
+ Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(0) }));
+ Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(0), GetBinaryPerson(1) }));
+ Assert.IsFalse(cache.ContainsKeys(new[] { GetBinaryPerson(1), GetBinaryPerson(0) }));
+ Assert.IsFalse(cache.ContainsKeys(new[]
+ {GetBinaryPerson(1), GetBinaryPerson(3), GetBinaryPerson(2), GetBinaryPerson(0)}));
+
+ Assert.Throws<ArgumentNullException>(() => cache.ContainsKeys(null));
+ }
+
+ /// <summary>
+ /// Tests the PutIfAbsent method.
+ /// </summary>
+ [Test]
+ public void TestPutIfAbsent()
+ {
+ var cache = GetBinaryKeyCache();
+
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+ var res = cache.PutIfAbsent(GetBinaryPerson(1), 1);
+ Assert.IsTrue(res);
+ Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+ res = cache.PutIfAbsent(GetBinaryPerson(1), 2);
+ Assert.IsFalse(res);
+ Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+ Assert.Throws<ArgumentNullException>(() => cache.PutIfAbsent(null, 1));
+ }
+
+ /// <summary>
+ /// Tests the GetAndPutIfAbsent method.
+ /// </summary>
+ [Test]
+ public void TestGetAndPutIfAbsent()
+ {
+ var cache = GetBinaryKeyCache();
+
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+ var res = cache.GetAndPutIfAbsent(GetBinaryPerson(1), 1);
+ Assert.IsFalse(res.Success);
+ Assert.AreEqual(0, res.Value);
+ Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+ res = cache.GetAndPutIfAbsent(GetBinaryPerson(1), 2);
+ Assert.IsTrue(res.Success);
+ Assert.AreEqual(1, res.Value);
+ Assert.AreEqual(1, cache[GetBinaryPerson(1)]);
+
+ Assert.Throws<ArgumentNullException>(() => cache.GetAndPutIfAbsent(null, 1));
+ }
+
+ /// <summary>
+ /// Tests the Replace method.
+ /// </summary>
+ [Test]
+ public void TestReplace()
+ {
+ var cache = GetBinaryKeyCache();
+
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+ var res = cache.Replace(GetBinaryPerson(1), 1);
+ Assert.IsFalse(res);
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+
+ cache[GetBinaryPerson(1)] = 1;
+
+ res = cache.Replace(GetBinaryPerson(1), 2);
+ Assert.IsTrue(res);
+ Assert.AreEqual(2, cache[GetBinaryPerson(1)]);
+
+ Assert.Throws<ArgumentNullException>(() => cache.Replace(null, 1));
+ }
+
+ /// <summary>
+ /// Tests the Replace overload with additional argument.
+ /// </summary>
+ [Test]
+ [Ignore("IGNITE-7072")]
+ public void TestReplaceIfEquals()
+ {
+ var cache = GetBinaryCache();
+
+ Assert.IsFalse(cache.ContainsKey(1));
+
+ var res = cache.Replace(1, GetBinaryPerson(1), GetBinaryPerson(2));
+ Assert.IsFalse(res);
+ Assert.IsFalse(cache.ContainsKey(1));
+
+ cache[1] = GetBinaryPerson(1);
+
+ res = cache.Replace(1, GetBinaryPerson(-1), GetBinaryPerson(2));
+ Assert.IsFalse(res);
+ Assert.AreEqual(1, cache[1]);
+
+ res = cache.Replace(1, GetBinaryPerson(1), GetBinaryPerson(2));
+ Assert.IsTrue(res);
+ Assert.AreEqual(GetBinaryPerson(2), cache[1]);
+ }
+
+ /// <summary>
+ /// Tests the PutAll method.
+ /// </summary>
+ [Test]
+ public void TestPutAll()
+ {
+ var cache = GetBinaryCache();
+
+ var keys = Enumerable.Range(1, 10).ToArray();
+ cache.PutAll(keys.ToDictionary(x => x, GetBinaryPerson));
+
+ Assert.AreEqual(keys, cache.GetAll(keys).Select(x => x.Value.GetField<int>("Id")));
+ }
+
+ /// <summary>
+ /// Tests the Clear method with a key argument.
+ /// </summary>
+ [Test]
+ public void TestClearKey()
+ {
+ var cache = GetBinaryKeyCache();
+
+ cache[GetBinaryPerson(1)] = 1;
+ cache[GetBinaryPerson(2)] = 2;
+
+ cache.Clear(GetBinaryPerson(1));
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+ Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(2)));
+
+ cache.Clear(GetBinaryPerson(2));
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(2)));
+ }
+
+ /// <summary>
+ /// Tests the Remove method.
+ /// </summary>
+ [Test]
+ public void TestRemove()
+ {
+ var cache = GetBinaryKeyCache();
+
+ cache[GetBinaryPerson(1)] = 1;
+ cache[GetBinaryPerson(2)] = 2;
+
+ var res = cache.Remove(GetBinaryPerson(1));
+ Assert.IsTrue(res);
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+ Assert.IsTrue(cache.ContainsKey(GetBinaryPerson(2)));
+
+ res = cache.Remove(GetBinaryPerson(2));
+ Assert.IsTrue(res);
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(1)));
+ Assert.IsFalse(cache.ContainsKey(GetBinaryPerson(2)));
+
+ res = cache.Remove(GetBinaryPerson(-1));
+ Assert.IsFalse(res);
+
+ Assert.Throws<ArgumentNullException>(() => cache.Remove(null));
+ }
+
+ /// <summary>
+ /// Tests the Remove method with value argument.
+ /// </summary>
+ [Test]
+ [Ignore("IGNITE-7072")]
+ public void TestRemoveKeyVal()
+ {
+ var cache = GetBinaryKeyValCache();
+
+ var x = GetBinaryPerson(1);
+ var y = GetBinaryPerson(2);
+ var z = GetBinaryPerson(0);
+
+ cache[x] = x;
+ cache[y] = y;
+
+ var res = cache.Remove(x, z);
+ Assert.IsFalse(res);
+
+ res = cache.Remove(z, z);
+ Assert.IsFalse(res);
+
+ res = cache.Remove(x, x);
+ Assert.IsTrue(res);
+ Assert.IsFalse(cache.ContainsKey(x));
+ Assert.IsTrue(cache.ContainsKey(y));
+
+ res = cache.Remove(y, y);
+ Assert.IsTrue(res);
+ Assert.IsFalse(cache.ContainsKey(x));
+ Assert.IsFalse(cache.ContainsKey(y));
+
+ res = cache.Remove(y, y);
+ Assert.IsFalse(res);
+ }
+
+ /// <summary>
+ /// Tests the RemoveAll with a set of keys.
+ /// </summary>
+ [Test]
+ public void TestRemoveReys()
+ {
+ var cache = GetBinaryKeyCache();
+
+ var ids = Enumerable.Range(1, 10).ToArray();
+ var keys = ids.Select(GetBinaryPerson).ToArray();
+ cache.PutAll(ids.ToDictionary(GetBinaryPerson, x => x));
+
+ cache.RemoveAll(keys.Skip(2));
+ CollectionAssert.AreEquivalent(keys.Take(2), cache.GetAll(keys).Select(x => x.Key));
+
+ cache.RemoveAll(new[] {GetBinaryPerson(1)});
+ Assert.AreEqual(2, cache.GetAll(keys).Single().Value);
+
+ cache.RemoveAll(keys);
+ cache.RemoveAll(keys);
+
+ Assert.AreEqual(0, cache.GetSize());
+
+ Assert.Throws<ArgumentNullException>(() => cache.RemoveAll(null));
+ }
+
+ /// <summary>
+ /// Tests the WithKeepBinary logic.
+ /// </summary>
+ [Test]
+ public void TestWithKeepBinary()
+ {
+ var cache = GetBinaryCache();
+ var cache2 = cache.WithKeepBinary<int, IBinaryObject>();
+
+ Assert.AreSame(cache, cache2);
+
+ Assert.Throws<InvalidOperationException>(() => cache.WithKeepBinary<int, object>());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
index 0660a20..354b869 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ScanQueryTest.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Client;
@@ -136,6 +137,26 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
}
}
+ /// <summary>
+ /// Tests scan query with .NET filter in binary mode.
+ /// </summary>
+ [Test]
+ public void TestWithFilterBinary()
+ {
+ GetPersonCache();
+
+ using (var client = GetClient())
+ {
+ var clientCache = client.GetCache<int, Person>(CacheName);
+ var binCache = clientCache.WithKeepBinary<int, IBinaryObject>();
+
+ // One result.
+ var single = binCache.Query(new ScanQuery<int, IBinaryObject>(new PersonIdFilterBinary(8))).Single();
+ Assert.AreEqual(8, single.Key);
+ }
+ }
+
+
#if !NETCOREAPP2_0 // Serializing delegates and exceptions is not supported on this platform.
/// <summary>
/// Tests the exception in filter.
@@ -195,6 +216,10 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
Assert.IsTrue(cur2.MoveNext());
Assert.IsTrue(cur3.MoveNext());
+ Assert.IsNotNull(cur1.Current);
+ Assert.IsNotNull(cur2.Current);
+ Assert.IsNotNull(cur3.Current);
+
Assert.AreEqual(cur1.Current.Key, cur2.Current.Key);
Assert.AreEqual(cur1.Current.Key, cur3.Current.Key);
}
@@ -289,5 +314,28 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
return entry.Key == _key;
}
}
+
+ /// <summary>
+ /// Person filter.
+ /// </summary>
+ private class PersonIdFilterBinary : ICacheEntryFilter<int, IBinaryObject>
+ {
+ /** Key. */
+ private readonly int _id;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PersonFilter"/> class.
+ /// </summary>
+ public PersonIdFilterBinary(int id)
+ {
+ _id = id;
+ }
+
+ /** <inheritdoc /> */
+ public bool Invoke(ICacheEntry<int, IBinaryObject> entry)
+ {
+ return entry.Value.GetField<int>("Id") == _id;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
index e1d30b9..6177f34 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
@@ -17,10 +17,13 @@
namespace Apache.Ignite.Core.Tests.Client
{
+ using System;
using System.Net;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Tests.Client.Cache;
using NUnit.Framework;
/// <summary>
@@ -134,5 +137,45 @@ namespace Apache.Ignite.Core.Tests.Client
{
return TestUtils.GetTestConfiguration();
}
+
+ /// <summary>
+ /// Converts object to binary form.
+ /// </summary>
+ protected IBinaryObject ToBinary(object o)
+ {
+ return Client.GetBinary().ToBinary<IBinaryObject>(o);
+ }
+
+ /// <summary>
+ /// Gets the binary cache.
+ /// </summary>
+ protected ICacheClient<int, IBinaryObject> GetBinaryCache()
+ {
+ return Client.GetCache<int, Person>(CacheName).WithKeepBinary<int, IBinaryObject>();
+ }
+
+ /// <summary>
+ /// Gets the binary key cache.
+ /// </summary>
+ protected ICacheClient<IBinaryObject, int> GetBinaryKeyCache()
+ {
+ return Client.GetCache<Person, int>(CacheName).WithKeepBinary<IBinaryObject, int>();
+ }
+
+ /// <summary>
+ /// Gets the binary key-val cache.
+ /// </summary>
+ protected ICacheClient<IBinaryObject, IBinaryObject> GetBinaryKeyValCache()
+ {
+ return Client.GetCache<Person, Person>(CacheName).WithKeepBinary<IBinaryObject, IBinaryObject>();
+ }
+
+ /// <summary>
+ /// Gets the binary person.
+ /// </summary>
+ protected IBinaryObject GetBinaryPerson(int id)
+ {
+ return ToBinary(new Person(id) { DateTime = DateTime.MinValue.ToUniversalTime() });
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
index eb91b0a..47b780d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Cache/ICacheClient.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Core.Client.Cache
{
+ using System;
using System.Collections.Generic;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Query;
@@ -239,5 +240,15 @@ namespace Apache.Ignite.Core.Client.Cache
/// Gets the cache configuration.
/// </summary>
CacheClientConfiguration GetConfiguration();
+
+ /// <summary>
+ /// Gets cache with KeepBinary mode enabled, changing key and/or value types if necessary.
+ /// You can only change key/value types when transitioning from non-binary to binary cache;
+ /// Changing type of binary cache is not allowed and will throw an <see cref="InvalidOperationException"/>.
+ /// </summary>
+ /// <typeparam name="TK1">Key type in binary mode.</typeparam>
+ /// <typeparam name="TV1">Value type in binary mode.</typeparam>
+ /// <returns>Cache instance with binary mode enabled.</returns>
+ ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index d5ba835..9bca883 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -19,6 +19,8 @@ namespace Apache.Ignite.Core.Client
{
using System;
using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Client.Cache;
/// <summary>
@@ -90,5 +92,12 @@ namespace Apache.Ignite.Core.Client
/// </summary>
/// <param name="name">The name of the cache to stop.</param>
void DestroyCache(string name);
+
+ /// <summary>
+ /// Gets Ignite binary services.
+ /// </summary>
+ /// <returns>Instance of <see cref="IBinary"/> interface</returns>
+ [SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")]
+ IBinary GetBinary();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
index fac7d18..7d62f3a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectBuilder.cs
@@ -537,10 +537,7 @@ namespace Apache.Ignite.Core.Impl.Binary
// 3. Handle metadata.
if (metaHnd != null)
{
- IDictionary<string, BinaryField> meta = metaHnd.OnObjectWriteFinished();
-
- if (meta != null)
- _parent._ctx.Writer.SaveMetadata(desc, meta);
+ _parent._ctx.Writer.SaveMetadata(desc, metaHnd.OnObjectWriteFinished());
}
}
finally
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
index 816e24a..febecd4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessorClient.cs
@@ -17,7 +17,6 @@
namespace Apache.Ignite.Core.Impl.Binary
{
- using System;
using System.Collections.Generic;
using System.Diagnostics;
using Apache.Ignite.Core.Binary;
@@ -53,13 +52,13 @@ namespace Apache.Ignite.Core.Impl.Binary
public BinaryType GetBinaryType(int typeId)
{
return _socket.DoOutInOp(ClientOp.BinaryTypeGet, s => s.WriteInt(typeId),
- s => new BinaryType(_marsh.StartUnmarshal(s), true));
+ s => s.ReadBool() ? new BinaryType(_marsh.StartUnmarshal(s), true) : null);
}
/** <inheritdoc /> */
public List<IBinaryType> GetBinaryTypes()
{
- throw new NotSupportedException();
+ throw IgniteClient.GetClientNotSupportedException();
}
/** <inheritdoc /> */
@@ -96,7 +95,7 @@ namespace Apache.Ignite.Core.Impl.Binary
/** <inheritdoc /> */
public BinaryType RegisterEnum(string typeName, IEnumerable<KeyValuePair<string, int>> values)
{
- throw new NotSupportedException();
+ throw IgniteClient.GetClientNotSupportedException();
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index 76c7b00..0ed2c6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -55,14 +55,15 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
private readonly Marshaller _marsh;
/** Keep binary flag. */
- private readonly bool _keepBinary = false;
+ private readonly bool _keepBinary;
/// <summary>
/// Initializes a new instance of the <see cref="CacheClient{TK, TV}" /> class.
/// </summary>
/// <param name="ignite">Ignite.</param>
/// <param name="name">Cache name.</param>
- public CacheClient(IgniteClient ignite, string name)
+ /// <param name="keepBinary">Binary mode flag.</param>
+ public CacheClient(IgniteClient ignite, string name, bool keepBinary = false)
{
Debug.Assert(ignite != null);
Debug.Assert(name != null);
@@ -71,6 +72,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
_ignite = ignite;
_marsh = _ignite.Marshaller;
_id = BinaryUtils.GetCacheId(name);
+ _keepBinary = keepBinary;
}
/** <inheritDoc /> */
@@ -358,6 +360,26 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
return DoOutInOp(ClientOp.CacheGetConfiguration, null, s => new CacheClientConfiguration(s));
}
+ /** <inheritDoc /> */
+ public ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>()
+ {
+ if (_keepBinary)
+ {
+ var result = this as ICacheClient<TK1, TV1>;
+
+ if (result == null)
+ {
+ throw new InvalidOperationException(
+ "Can't change type of binary cache. WithKeepBinary has been called on an instance of " +
+ "binary cache with incompatible generic arguments.");
+ }
+
+ return result;
+ }
+
+ return new CacheClient<TK1, TV1>(_ignite, _name, true);
+ }
+
/// <summary>
/// Does the out in op.
/// </summary>
@@ -402,7 +424,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
stream.Seek(-1, SeekOrigin.Current);
- return _marsh.Unmarshal<T>(stream);
+ return _marsh.Unmarshal<T>(stream, _keepBinary);
}
/// <summary>
@@ -419,7 +441,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
stream.Seek(-1, SeekOrigin.Current);
- return new CacheResult<T>(_marsh.Unmarshal<T>(stream));
+ return new CacheResult<T>(_marsh.Unmarshal<T>(stream, _keepBinary));
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/35e621fe/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index 2dd18cc..10bf38f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -47,6 +47,9 @@ namespace Apache.Ignite.Core.Impl.Client
/** Binary processor. */
private readonly IBinaryProcessor _binProc;
+ /** Binary. */
+ private readonly IBinary _binary;
+
/// <summary>
/// Initializes a new instance of the <see cref="IgniteClient"/> class.
/// </summary>
@@ -63,6 +66,8 @@ namespace Apache.Ignite.Core.Impl.Client
};
_binProc = clientConfiguration.BinaryProcessor ?? new BinaryProcessorClient(_socket);
+
+ _binary = new Binary(_marsh);
}
/// <summary>
@@ -154,7 +159,7 @@ namespace Apache.Ignite.Core.Impl.Client
/** <inheritDoc /> */
public IBinary GetBinary()
{
- throw GetClientNotSupportedException();
+ return _binary;
}
/** <inheritDoc /> */
[5/6] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-zk
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47971818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47971818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47971818
Branch: refs/heads/ignite-zk
Commit: 47971818e2e8dddb63e3714f1745246fc26dcbc3
Parents: c1e2033 35e621f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 11:59:58 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 11:59:58 2017 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 22 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 15 +-
.../processors/hadoop/HadoopJobInfo.java | 7 +
.../binary/ClientBinaryTypeGetRequest.java | 4 +-
.../binary/ClientBinaryTypeGetResponse.java | 10 +-
.../processors/hadoop/impl/HadoopUtils.java | 45 +-
.../impl/fs/HadoopFileSystemCacheUtils.java | 34 +-
.../hadoop/impl/proto/HadoopClientProtocol.java | 8 +-
.../processors/hadoop/impl/v2/HadoopV2Job.java | 32 +-
.../hadoop/impl/v2/HadoopV2TaskContext.java | 62 ++-
.../impl/HadoopAbstractMapReduceTest.java | 2 +-
.../hadoop/impl/HadoopGroupingTest.java | 2 +-
.../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +-
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 2 +-
.../hadoop/impl/HadoopPlannerMockJob.java | 6 +
.../hadoop/impl/HadoopSortingTest.java | 4 +-
.../impl/HadoopTaskExecutionSelfTest.java | 10 +-
.../hadoop/impl/HadoopTasksV1Test.java | 2 +-
.../hadoop/impl/HadoopTasksV2Test.java | 2 +-
.../hadoop/impl/HadoopTeraSortTest.java | 2 +-
.../hadoop/impl/HadoopV2JobSelfTest.java | 2 +-
.../collections/HadoopAbstractMapTest.java | 6 +
.../HadoopExternalTaskExecutionSelfTest.java | 4 +-
.../Apache.Ignite.Core.Tests.DotNetCore.csproj | 2 +
.../Apache.Ignite.Core.Tests.csproj | 2 +
.../Cache/CacheAbstractTest.cs | 26 +
.../Client/Cache/BinaryBuilderTest.cs | 118 +++++
.../Client/Cache/CacheTest.cs | 2 +-
.../Client/Cache/CacheTestKeepBinary.cs | 499 +++++++++++++++++++
.../Client/Cache/ScanQueryTest.cs | 48 ++
.../Client/ClientTestBase.cs | 43 ++
.../Client/Cache/ICacheClient.cs | 11 +
.../Apache.Ignite.Core/Client/IIgniteClient.cs | 9 +
.../Impl/Binary/BinaryObjectBuilder.cs | 5 +-
.../Impl/Binary/BinaryProcessorClient.cs | 7 +-
.../Impl/Client/Cache/CacheClient.cs | 30 +-
.../Impl/Client/IgniteClient.cs | 7 +-
37 files changed, 1007 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
[2/6] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fa9de3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fa9de3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fa9de3e
Branch: refs/heads/ignite-zk
Commit: 5fa9de3ef302937e2cea22a2f0f29c957e5c7313
Parents: 438760e 89c82f5
Author: devozerov <vo...@gridgain.com>
Authored: Wed Nov 29 13:50:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:33 2017 +0300
----------------------------------------------------------------------
.../cache/binary/BinaryMetadataTransport.java | 20 ++-
.../continuous/GridContinuousProcessor.java | 132 ++++++++++++-------
2 files changed, 93 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
[3/6] ignite git commit: IGNITE-7016 Avoid WAL segment fsync on
header write in non-default mode
Posted by sb...@apache.org.
IGNITE-7016 Avoid WAL segment fsync on header write in non-default mode
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2b145a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2b145a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2b145a1
Branch: refs/heads/ignite-zk
Commit: c2b145a1b14160dcf645d7ceddb2b5d58b40b43a
Parents: 5fa9de3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Nov 29 19:13:20 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 29 19:13:20 2017 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2b145a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a450521..948a8ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1304,16 +1304,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
synchronized (this) {
while (locked.containsKey(toArchive) && !stopped)
wait();
+ }
- // Firstly, format working file
- if (!stopped)
- formatFile(res.getOrigWorkFile());
+ // Firstly, format working file
+ if (!stopped)
+ formatFile(res.getOrigWorkFile());
+ synchronized (this) {
// Then increase counter to allow rollover on clean working file
changeLastArchivedIndexAndWakeupCompressor(toArchive);
notifyAll();
}
+
if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED))
evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
res.getAbsIdx(), res.getDstArchiveFile()));
@@ -1369,7 +1372,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return curAbsWalIdx;
}
}
- catch (InterruptedException e) {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedCheckedException(e);
@@ -1889,7 +1892,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @return I/O position after write version.
* @throws IOException If failed to write serializer version.
*/
- public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException {
+ public static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false);
do {
@@ -1898,7 +1901,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
while (buffer.hasRemaining());
// Flush
- io.force();
+ if (mode == WALMode.DEFAULT)
+ io.force();
return io.position();
}
@@ -2205,9 +2209,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
public void writeSerializerVersion() throws IgniteCheckedException {
try {
- assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position();
+ assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " +
+ fileIO.position();
- long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version());
+ long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx,
+ serializer.version(), mode);
written = updatedPosition;
lastFsyncPos = updatedPosition;
[6/6] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/827b7085
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/827b7085
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/827b7085
Branch: refs/heads/ignite-zk
Commit: 827b708545cf851f784998c5faef874bda8e0898
Parents: 4797181
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 30 15:23:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 30 16:47:05 2017 +0300
----------------------------------------------------------------------
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +
.../discovery/zk/internal/ZookeeperClient.java | 44 ++
.../zk/internal/ZookeeperDiscoveryImpl.java | 500 ++++++++-----------
.../ZookeeperDiscoverySpiBasicTest.java | 31 ++
4 files changed, 287 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ed4c520..e7f86cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -440,6 +440,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ if (oldMeta == mergedMeta)
+ return;
+
MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
assert res != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index ea9b289..73547cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -521,6 +521,13 @@ public class ZookeeperClient implements Watcher {
/**
* @param path Path.
+ */
+ void deleteIfExistsAsync(String path) {
+ new DeleteIfExistsOperation(path).execute();
+ }
+
+ /**
+ * @param path Path.
* @param watcher Watcher.
* @param cb Callback.
*/
@@ -842,6 +849,43 @@ public class ZookeeperClient implements Watcher {
/**
*
*/
+ class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation {
+ /** */
+ private final String path;
+
+ /**
+ * @param path Path.
+ */
+ DeleteIfExistsOperation(String path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute() {
+ zk.delete(path, -1, this, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processResult(int rc, String path, Object ctx) {
+ if (rc == KeeperException.Code.NONODE.intValue())
+ return;
+
+ if (needRetry(rc)) {
+ U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" +
+ "path=" + path + ']');
+
+ retryQ.add(this);
+ }
+ else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+ U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+ else
+ assert rc == 0 : rc;
+ }
+ }
+
+ /**
+ *
+ */
class CreateCallbackWrapper implements AsyncCallback.StringCallback {
/** */
final CreateOperation op;
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 0ecbaf3..38f9cbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -27,9 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -127,16 +125,16 @@ public class ZookeeperDiscoveryImpl {
private final int evtsAckThreshold;
/** */
- private ZkRuntimeState state = new ZkRuntimeState(false);
+ private ZkRuntimeState state;
/** */
private volatile ConnectionState connState = ConnectionState.STARTED;
/** */
- private ZkEventWorker evtWorker;
+ private final AtomicBoolean stop = new AtomicBoolean();
/** */
- private final AtomicBoolean stop = new AtomicBoolean();
+ private final Object stateMux = new Object();
/**
* @param log Logger.
@@ -213,7 +211,8 @@ public class ZookeeperDiscoveryImpl {
* @return Ping result.
*/
public boolean pingNode(UUID nodeId) {
- checkState();
+ while (!busyLock.enterBusy())
+ checkState();
// TODO ZK
return node(nodeId) != null;
@@ -225,14 +224,96 @@ public class ZookeeperDiscoveryImpl {
public void reconnect() {
assert clientReconnectEnabled;
- evtWorker.onReconnectRequest();
+ synchronized (stateMux) {
+ if (connState == ConnectionState.STARTED)
+ connState = ConnectionState.DISCONNECTED;
+ else
+ return;
+ }
+
+ state.zkClient.onCloseStart();
+
+ busyLock.block();
+
+ busyLock.unblock();
+
+ state.zkClient.close();
+
+ UUID newId = UUID.randomUUID();
+
+ U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode + ']');
+
+ doReconnect(newId);
+ }
+
+ /**
+ * @param newId New ID.
+ */
+ private void doReconnect(UUID newId) {
+ locNode.onClientDisconnected(newId);
+
+ if (state.joined) {
+ assert state.evtsData != null;
+
+ lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+ state.evtsData.topVer,
+ locNode,
+ state.top.topologySnapshot(),
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+ }
+
+ try {
+ joinTopology0(state.joined);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to reconnect: " + e, e);
+
+ onSegmented(e);
+ }
+ }
+
+
+ /**
+ * @param e Error.
+ */
+ private void onSegmented(Exception e) {
+ if (state.joined) {
+ synchronized (stateMux) {
+ connState = ConnectionState.STOPPED;
+ }
+
+ zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null);
+ }
+ else
+ joinFut.onDone(e);
+ }
+
+ /**
+ *
+ */
+ class SegmentedWatcher implements AsyncCallback.VoidCallback {
+ @Override public void processResult(int rc, String path, Object ctx) {
+ assert state.evtsData != null;
+
+ lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+ state.evtsData.topVer,
+ locNode,
+ state.top.topologySnapshot(),
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+ }
}
/**
* @return Remote nodes.
*/
public Collection<ClusterNode> remoteNodes() {
- checkState();
+ while (!busyLock.enterBusy())
+ checkState();
return state.top.remoteNodes();
}
@@ -258,14 +339,9 @@ public class ZookeeperDiscoveryImpl {
* @return {@code True} if node joined or joining topology.
*/
public boolean knownNode(UUID nodeId) {
- checkState();
-
- if (!busyLock.enterBusy()) {
+ while (!busyLock.enterBusy())
checkState();
- throw new IgniteSpiException("Zookeeper client closed.");
- }
-
try {
List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir);
@@ -295,8 +371,6 @@ public class ZookeeperDiscoveryImpl {
* @param msg Message.
*/
public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
- checkState();
-
assert msg != null;
byte[] msgBytes;
@@ -308,12 +382,9 @@ public class ZookeeperDiscoveryImpl {
throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
}
- if (!busyLock.enterBusy()) {
+ while (!busyLock.enterBusy())
checkState();
- throw new IgniteSpiException("Zookeeper client closed.");
- }
-
try {
String prefix = UUID.randomUUID().toString();
@@ -367,7 +438,9 @@ public class ZookeeperDiscoveryImpl {
/**
* @throws InterruptedException If interrupted.
*/
- private void joinTopology0(boolean reconnect) throws InterruptedException {
+ private void joinTopology0(boolean prevJoined) throws InterruptedException {
+ state = new ZkRuntimeState(prevJoined);
+
DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
exchange.collect(discoDataBag);
@@ -394,14 +467,6 @@ public class ZookeeperDiscoveryImpl {
throw new IgniteSpiException("Failed to create Zookeeper client", e);
}
- if (!reconnect) {
- evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log);
-
- evtWorker.start();
- }
- else
- assert evtWorker != null;
-
startJoin(joinDataBytes);
}
@@ -822,37 +887,32 @@ public class ZookeeperDiscoveryImpl {
if (log.isInfoEnabled())
log.info("Delete join data: " + joinDataPath);
- // TODO ZK async
- state.zkClient.deleteIfExists(joinDataPath, -1);
+ state.zkClient.deleteIfExistsAsync(joinDataPath);
final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- if (connState == ConnectionState.DISCONNECTED)
- connState = ConnectionState.STARTED;
-
- lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
- 1L,
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- if (state.prevJoined) {
- lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
- 1L,
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
- }
+ if (connState == ConnectionState.DISCONNECTED)
+ connState = ConnectionState.STARTED;
- joinFut.onDone();
- }
- });
+ lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+ 1L,
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ if (state.prevJoined) {
+ lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+ 1L,
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+ }
+
+ joinFut.onDone();
}
/**
@@ -948,8 +1008,8 @@ public class ZookeeperDiscoveryImpl {
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
- if (log.isInfoEnabled())
- log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
@@ -958,7 +1018,7 @@ public class ZookeeperDiscoveryImpl {
else {
U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
- state.zkClient.deleteIfExists(evtDataPath, -1);
+ state.zkClient.deleteIfExistsAsync(evtDataPath);
}
state.evtsData.procCustEvt = evtE.getKey();
@@ -1005,7 +1065,7 @@ public class ZookeeperDiscoveryImpl {
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
+ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception {
TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
boolean updateNodeInfo = false;
@@ -1026,8 +1086,8 @@ public class ZookeeperDiscoveryImpl {
processLocalJoin(evtsData, evtData0);
}
else {
- if (log.isInfoEnabled())
- log.info("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
+ if (log.isDebugEnabled())
+ log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']');
switch (evtData.eventType()) {
case EventType.EVT_NODE_JOINED: {
@@ -1165,40 +1225,35 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- if (connState == ConnectionState.DISCONNECTED)
- connState = ConnectionState.STARTED;
-
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- if (state.prevJoined) {
- lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
- evtData.topologyVersion(),
- locNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
-
- U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
- }
+ if (connState == ConnectionState.DISCONNECTED)
+ connState = ConnectionState.STARTED;
- joinFut.onDone();
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ if (state.prevJoined) {
+ lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+ evtData.topologyVersion(),
+ locNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
+
+ U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+ }
+
+ joinFut.onDone();
state.joined = true;
- if (log.isInfoEnabled())
- log.info("Delete data for joined: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete data for joined: " + path);
- // TODO ZK: async
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
}
/**
@@ -1207,8 +1262,8 @@ public class ZookeeperDiscoveryImpl {
*/
@SuppressWarnings("unchecked")
private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
- if (log.isInfoEnabled())
- log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId);
@@ -1216,16 +1271,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- sndNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- msg);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ sndNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ msg);
}
/**
@@ -1245,16 +1296,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- joinedNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ joinedNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
}
/**
@@ -1268,16 +1315,12 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
- evtWorker.evtsQ.add(new Runnable() {
- @Override public void run() {
- lsnr.onDiscovery(evtData.eventType(),
- evtData.topologyVersion(),
- failedNode,
- topSnapshot,
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- });
+ lsnr.onDiscovery(evtData.eventType(),
+ evtData.topologyVersion(),
+ failedNode,
+ topSnapshot,
+ Collections.<Long, Collection<ClusterNode>>emptyMap(),
+ null);
}
/**
@@ -1326,8 +1369,8 @@ public class ZookeeperDiscoveryImpl {
String path = zkPaths.ackEventDataPath(evtId);
- if (log.isInfoEnabled())
- log.info("Create ack event: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Create ack event: " + path);
// TODO ZK: delete is previous exists?
state.zkClient.createIfNeeded(
@@ -1349,9 +1392,10 @@ public class ZookeeperDiscoveryImpl {
newEvts.add(ackEvtData);
- if (log.isInfoEnabled()) {
- log.info("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
+ if (log.isDebugEnabled()) {
+ log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
", evt=" + ackEvtData +
+ ", evtSize=" + ackBytes.length +
", msg=" + ack + ']');
}
}
@@ -1360,8 +1404,8 @@ public class ZookeeperDiscoveryImpl {
}
case EventType.EVT_NODE_FAILED: {
- if (log.isInfoEnabled())
- log.info("All nodes processed node fail [evtData=" + evtData + ']');
+ if (log.isDebugEnabled())
+ log.debug("All nodes processed node fail [evtData=" + evtData + ']');
break; // Do not need addition cleanup.
}
@@ -1417,8 +1461,8 @@ public class ZookeeperDiscoveryImpl {
if (log.isInfoEnabled())
log.info("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']');
- state.zkClient.deleteIfExists(evtDataPath, -1);
- state.zkClient.deleteIfExists(dataForJoinedPath, -1);
+ state.zkClient.deleteIfExistsAsync(evtDataPath);
+ state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
}
/**
@@ -1429,15 +1473,16 @@ public class ZookeeperDiscoveryImpl {
@Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
throws Exception
{
- if (log.isInfoEnabled())
- log.info("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
+ if (log.isDebugEnabled())
+ log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']');
if (!evtData.ackEvent()) {
String path = zkPaths.customEventDataPath(false, evtData.evtPath);
- log.info("Delete path: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete path: " + path);
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
@@ -1447,9 +1492,10 @@ public class ZookeeperDiscoveryImpl {
else {
String path = zkPaths.ackEventDataPath(evtData.eventId());
- log.info("Delete path: " + path);
+ if (log.isDebugEnabled())
+ log.debug("Delete path: " + path);
- state.zkClient.deleteIfExists(path, -1);
+ state.zkClient.deleteIfExistsAsync(path);
}
return null;
@@ -1472,7 +1518,9 @@ public class ZookeeperDiscoveryImpl {
log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']');
- connState = ConnectionState.STOPPED;
+ synchronized (stateMux) {
+ connState = ConnectionState.STOPPED;
+ }
ZookeeperClient zkClient = state.zkClient;
@@ -1481,18 +1529,12 @@ public class ZookeeperDiscoveryImpl {
busyLock.block();
+ busyLock.unblock();
+
joinFut.onDone(e);
if (zkClient != null)
zkClient.close();
-
- ZkEventWorker evtWorker = this.evtWorker;
-
- if (evtWorker != null) {
- evtWorker.interrupt();
-
- evtWorker.join();
- }
}
/**
@@ -1552,165 +1594,27 @@ public class ZookeeperDiscoveryImpl {
/**
*
*/
- private class ZkEventWorker extends IgniteSpiThread {
- /** */
- private final Runnable RECONNECT = new Runnable() {@Override public void run() {}};
-
- /** */
- private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}};
-
- /** */
- private final BlockingQueue<Runnable> evtsQ;
-
- /**
- * @param igniteInstanceName Ignite instance name.
- * @param name Thread name.
- * @param log Logger.
- */
- ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) {
- super(igniteInstanceName, name, log);
-
- evtsQ = new LinkedBlockingQueue<>();
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- Runnable r = evtsQ.take();
-
- if (r == RECONNECT)
- processReconnect();
- if (r == CONNECTION_LOST)
- processConnectionLost();
- else {
- if (!busyLock.enterBusy())
- return;
-
- try {
- r.run();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
- }
-
+ private class ReconnectorThread extends IgniteSpiThread {
/**
*
*/
- void onConnectionLoss() {
- evtsQ.add(CONNECTION_LOST);
+ ReconnectorThread() {
+ super(ZookeeperDiscoveryImpl.this.igniteInstanceName, "zk-reconnector", log);
}
- /**
- *
- */
- void onReconnectRequest() {
- evtsQ.add(RECONNECT);
- }
-
- /**
- *
- */
- private void processReconnect() {
- assert locNode.isClient() : locNode;
-
- if (connState == ConnectionState.DISCONNECTED)
- return;
-
- connState = ConnectionState.DISCONNECTED;
-
- state.zkClient.onCloseStart();
-
+ @Override protected void body() throws InterruptedException {
busyLock.block();
busyLock.unblock();
- state.zkClient.close();
-
UUID newId = UUID.randomUUID();
- U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" +
+ U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
"newId=" + newId +
", prevId=" + locNode.id() +
", locNode=" + locNode + ']');
- reconnect(newId);
- }
-
- /**
- *
- */
- void processConnectionLost() {
- if (clientReconnectEnabled) {
- connState = ConnectionState.DISCONNECTED;
-
- busyLock.block();
-
- busyLock.unblock();
-
- UUID newId = UUID.randomUUID();
-
- U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
- "newId=" + newId +
- ", prevId=" + locNode.id() +
- ", locNode=" + locNode + ']');
-
- reconnect(newId);
- }
- else {
- U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
-
- onSegmented(new IgniteSpiException("Zookeeper connection loss."));
- }
- }
-
- /**
- * @param newId New ID.
- */
- private void reconnect(UUID newId) {
- locNode.onClientDisconnected(newId);
-
- if (state.joined) {
- assert state.evtsData != null;
-
- lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
- state.evtsData.topVer,
- locNode,
- state.top.topologySnapshot(),
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
-
- state = new ZkRuntimeState(state.joined);
-
- try {
- joinTopology0(true);
- }
- catch (Exception e) {
- U.error(log, "Failed to reconnect: " + e, e);
-
- onSegmented(e);
- }
- }
-
- /**
- * @param e Error.
- */
- private void onSegmented(Exception e) {
- if (state.joined) {
- assert state.evtsData != null;
-
- lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
- state.evtsData.topVer,
- locNode,
- state.top.topologySnapshot(),
- Collections.<Long, Collection<ClusterNode>>emptyMap(),
- null);
- }
- else
- joinFut.onDone(e);
+ doReconnect(newId);
}
}
@@ -1723,7 +1627,21 @@ public class ZookeeperDiscoveryImpl {
/** {@inheritDoc} */
@Override public void run() {
- evtWorker.onConnectionLoss();
+ if (clientReconnectEnabled) {
+ synchronized (stateMux) {
+ if (connState == ConnectionState.STARTED)
+ connState = ConnectionState.DISCONNECTED;
+ else
+ return;
+ }
+
+ new ReconnectorThread().start();
+ }
+ else {
+ U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
+
+ onSegmented(new IgniteSpiException("Zookeeper connection loss."));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/827b7085/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 8ae84c1..d50e9b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.io.File;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -306,6 +307,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testMetadataUpdate() throws Exception {
+ startGrid(0);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite(0).configuration().getMarshaller().marshal(new C1());
+ ignite(0).configuration().getMarshaller().marshal(new C2());
+
+ return null;
+ }
+ }, 64, "marshal");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testClientNodesStatus() throws Exception {
startGrid(0);
@@ -1661,4 +1678,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
return data;
}
}
+
+ /**
+ *
+ */
+ private static class C1 implements Serializable {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ private static class C2 implements Serializable {
+ // No-op.
+ }
}