You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/11/29 10:50:37 UTC
[1/2] ignite git commit: IGNITE-6992: Hadoop: fixed MR problem with
HDFS access when Kerberos is enabled. This closes #3097.
Repository: ignite
Updated Branches:
refs/heads/master 89c82f5fa -> 5fa9de3ef
IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/438760ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/438760ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/438760ed
Branch: refs/heads/master
Commit: 438760ed7f9d37bb72de5e5a38d46ce2450544f8
Parents: 5fa5ae7
Author: Evgenii Zhuravlev <e....@gmail.com>
Authored: Wed Nov 29 13:50:13 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:13 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopDefaultJobInfo.java | 15 ++++-
.../processors/hadoop/HadoopJobInfo.java | 7 +++
.../processors/hadoop/impl/HadoopUtils.java | 45 +++++++++++++-
.../impl/fs/HadoopFileSystemCacheUtils.java | 34 ++++++-----
.../hadoop/impl/proto/HadoopClientProtocol.java | 8 ++-
.../processors/hadoop/impl/v2/HadoopV2Job.java | 32 ++++++++--
.../hadoop/impl/v2/HadoopV2TaskContext.java | 62 +++++++++++++-------
.../impl/HadoopAbstractMapReduceTest.java | 2 +-
.../hadoop/impl/HadoopGroupingTest.java | 2 +-
.../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +-
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 2 +-
.../hadoop/impl/HadoopPlannerMockJob.java | 6 ++
.../hadoop/impl/HadoopSortingTest.java | 4 +-
.../impl/HadoopTaskExecutionSelfTest.java | 10 ++--
.../hadoop/impl/HadoopTasksV1Test.java | 2 +-
.../hadoop/impl/HadoopTasksV2Test.java | 2 +-
.../hadoop/impl/HadoopTeraSortTest.java | 2 +-
.../hadoop/impl/HadoopV2JobSelfTest.java | 2 +-
.../collections/HadoopAbstractMapTest.java | 6 ++
.../HadoopExternalTaskExecutionSelfTest.java | 4 +-
20 files changed, 189 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d4a29b2..a66f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -52,6 +52,9 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
/** User name. */
private String user;
+ /** Credentials. */
+ private byte[] credentials;
+
/**
* Default constructor required by {@link Externalizable}.
*/
@@ -69,12 +72,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
* @param props All other properties of the job.
*/
public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
- Map<String, String> props) {
+ Map<String, String> props, byte[] credentials) {
this.jobName = jobName;
this.user = user;
this.hasCombiner = hasCombiner;
this.numReduces = numReduces;
this.props = props;
+ this.credentials = credentials;
}
/** {@inheritDoc} */
@@ -127,6 +131,11 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
+ @Override public byte[] credentials() {
+ return credentials;
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, jobName);
U.writeString(out, user);
@@ -135,6 +144,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
out.writeInt(numReduces);
IgfsUtils.writeStringMap(out, props);
+
+ U.writeByteArray(out, credentials);
}
/** {@inheritDoc} */
@@ -146,6 +157,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
numReduces = in.readInt();
props = IgfsUtils.readStringMap(in);
+
+ credentials = U.readByteArray(in);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 4cc8f80..3dffbc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -69,6 +69,13 @@ public interface HadoopJobInfo {
String user();
/**
+ * Gets credentials.
+ *
+ * @return Credentials.
+ */
+ byte[] credentials();
+
+ /**
* Creates new job instance for the given ID.
* {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
* This method will be called once for the same ID on one node, though it can be called on the same host
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index 767e10a..89c60b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.impl;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
+import java.io.DataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -27,6 +28,8 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
@@ -211,10 +214,12 @@ public class HadoopUtils {
* Creates JobInfo from hadoop configuration.
*
* @param cfg Hadoop configuration.
+ * @param credentials Credentials.
* @return Job info.
* @throws IgniteCheckedException If failed.
*/
- public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+ public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials)
+ throws IgniteCheckedException {
JobConf jobConf = new JobConf(cfg);
boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
@@ -269,7 +274,8 @@ public class HadoopUtils {
for (Map.Entry<String, String> entry : jobConf)
props.put(entry.getKey(), entry.getValue());
- return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+ return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props,
+ credentials);
}
/**
@@ -394,4 +400,39 @@ public class HadoopUtils {
return len1 - len2;
}
+
+ /**
+ * Deserialization of Hadoop Writable object.
+ *
+ * @param writable Writable object to deserialize to.
+ * @param bytes byte array to deserialize.
+ */
+ public static void deserialize(Writable writable, byte[] bytes) throws IOException {
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
+
+ writable.readFields(dataIn);
+
+ dataIn.close();
+ }
+
+ /**
+ * Create UserGroupInformation for specified user and credentials.
+ *
+ * @param user User.
+ * @param credentialsBytes Credentials byte array.
+ */
+ public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException {
+ Credentials credentials = new Credentials();
+
+ HadoopUtils.deserialize(credentials, credentialsBytes);
+
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+
+ ugi.addCredentials(credentials);
+
+ if (credentials.numberOfTokens() > 0)
+ ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN);
+
+ return ugi;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
index 0b673e9..f48d21d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteException;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -41,27 +42,32 @@ public class HadoopFileSystemCacheUtils {
return new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
@Override public FileSystem createValue(FsCacheKey key) throws IOException {
- try {
- assert key != null;
+ assert key != null;
- // Explicitly disable FileSystem caching:
- URI uri = key.uri();
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
- String scheme = uri.getScheme();
+ String scheme = uri.getScheme();
- // Copy the configuration to avoid altering the external object.
- Configuration cfg = new Configuration(key.configuration());
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+ String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
- cfg.setBoolean(prop, true);
+ cfg.setBoolean(prop, true);
- return FileSystem.get(uri, cfg, key.user());
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() ==
+ UserGroupInformation.AuthenticationMethod.TOKEN)
+ return FileSystem.get(uri, cfg);
+ else {
+ try {
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- throw new IOException("Failed to create file system due to interrupt.", e);
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index 7fc0e77..811b0c2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -122,8 +123,13 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
+ byte[] credentials = null;
+
+ if (ts != null)
+ credentials = WritableUtils.toByteArray(ts);
+
HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
- jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
+ jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials));
if (status == null)
throw new IOException("Failed to submit job (null status obtained): " + jobId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 2a85cb8..28b4d6b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.hadoop.impl.v2;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter;
@@ -317,7 +320,7 @@ public class HadoopV2Job extends HadoopJobEx {
}
/** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+ @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException {
assert locNodeId != null;
this.locNodeId = locNodeId;
@@ -325,15 +328,36 @@ public class HadoopV2Job extends HadoopJobEx {
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
try {
- rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+ if (jobInfo.credentials() == null)
+ rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+ else {
+ UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials());
+
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override public Void run() throws Exception {
+ rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId,
+ jobId));
+
+ return null;
+ }
+ });
+ }
+ catch (IOException | InterruptedException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) {
- U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + " job property is set to true; please disable " +
- "it if job tasks rely on mutable static state.");
+ U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() +
+ " job property is set to true; please disable " + "it if job tasks rely on mutable static state.");
sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
}
}
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 6127822..c362b0c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
@@ -548,41 +549,58 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
/** {@inheritDoc} */
@Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
- String user = job.info().user();
+ if (job.info().credentials() == null) {
+ String user = job.info().user();
- user = IgfsUtils.fixUserName(user);
+ user = IgfsUtils.fixUserName(user);
- assert user != null;
+ assert user != null;
- String ugiUser;
+ String ugiUser;
- try {
- UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
- assert currUser != null;
+ assert currUser != null;
- ugiUser = currUser.getShortUserName();
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe);
- }
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
- try {
- if (F.eq(user, ugiUser))
- // if current UGI context user is the same, do direct call:
- return c.call();
- else {
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ else {
+ try {
+ UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials());
return ugi.doAs(new PrivilegedExceptionAction<T>() {
- @Override public T run() throws Exception {
+ @Override
+ public T run() throws Exception {
return c.call();
}
});
}
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 4928e3d..fc6d7f8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -207,7 +207,7 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
index 2de2d19..d27a234 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -127,7 +127,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest {
}
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(30000);
+ createJobInfo(job.getConfiguration(), null)).get(30000);
assertTrue(HadoopGroupingTestState.values().isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index 381652e..c3b3040 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -121,7 +121,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(globalId, 1);
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
checkStatus(jobId, false);
@@ -168,7 +168,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(globalId, 1);
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
checkStatus(jobId, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 6eb16af..21b7ee2 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
job.setJarByClass(HadoopWordCount2.class);
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
index 28c8264..b3368bd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -178,5 +178,11 @@ public class HadoopPlannerMockJob extends HadoopJobEx {
return null;
}
+
+ @Override public byte[] credentials() {
+ throwUnsupported();
+
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
index 2e85cce..bb11ccb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -117,7 +117,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
X.printerrln("Data generation started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration())).get(180000);
+ createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Data generation complete.");
@@ -148,7 +148,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
X.printerrln("Job started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(180000);
+ createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Job complete.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index c27a67f..2394ada 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
@@ -188,7 +188,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
@@ -226,7 +226,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
job.setJarByClass(getClass());
final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -313,7 +313,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+ final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg, null));
if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -364,7 +364,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
assertFalse(killRes);
- final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+ final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg, null));
if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
index 1d7f3e4..ca96551 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
@@ -46,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
setupFileSystems(jobConf);
- HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+ HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf, null);
UUID uuid = new UUID(0, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
index 61e3e46..0fcd358 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
@@ -65,7 +65,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
- HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+ HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration(), null);
UUID uuid = new UUID(0, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index d8b74ce..46752a8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -178,7 +178,7 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
index 2c2f049..041f0bc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -80,7 +80,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
cfg.setMapOutputValueClass(Text.class);
cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
- HadoopDefaultJobInfo info = createJobInfo(cfg);
+ HadoopDefaultJobInfo info = createJobInfo(cfg, null);
final UUID uuid = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index b9dcae1..49be0a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -177,5 +177,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
return null;
}
+
+ @Override public byte[] credentials() {
+ assert false;
+
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 0afd689..1246078 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -117,7 +117,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
fut.get();
}
@@ -153,7 +153,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
+ createJobInfo(job.getConfiguration(), null));
try {
fut.get();
[2/2] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fa9de3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fa9de3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fa9de3e
Branch: refs/heads/master
Commit: 5fa9de3ef302937e2cea22a2f0f29c957e5c7313
Parents: 438760e 89c82f5
Author: devozerov <vo...@gridgain.com>
Authored: Wed Nov 29 13:50:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:33 2017 +0300
----------------------------------------------------------------------
.../cache/binary/BinaryMetadataTransport.java | 20 ++-
.../continuous/GridContinuousProcessor.java | 132 ++++++++++++-------
2 files changed, 93 insertions(+), 59 deletions(-)
----------------------------------------------------------------------