You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/11/29 10:50:37 UTC

[1/2] ignite git commit: IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.

Repository: ignite
Updated Branches:
  refs/heads/master 89c82f5fa -> 5fa9de3ef


IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/438760ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/438760ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/438760ed

Branch: refs/heads/master
Commit: 438760ed7f9d37bb72de5e5a38d46ce2450544f8
Parents: 5fa5ae7
Author: Evgenii Zhuravlev <e....@gmail.com>
Authored: Wed Nov 29 13:50:13 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:13 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopDefaultJobInfo.java | 15 ++++-
 .../processors/hadoop/HadoopJobInfo.java        |  7 +++
 .../processors/hadoop/impl/HadoopUtils.java     | 45 +++++++++++++-
 .../impl/fs/HadoopFileSystemCacheUtils.java     | 34 ++++++-----
 .../hadoop/impl/proto/HadoopClientProtocol.java |  8 ++-
 .../processors/hadoop/impl/v2/HadoopV2Job.java  | 32 ++++++++--
 .../hadoop/impl/v2/HadoopV2TaskContext.java     | 62 +++++++++++++-------
 .../impl/HadoopAbstractMapReduceTest.java       |  2 +-
 .../hadoop/impl/HadoopGroupingTest.java         |  2 +-
 .../hadoop/impl/HadoopJobTrackerSelfTest.java   |  4 +-
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |  2 +-
 .../hadoop/impl/HadoopPlannerMockJob.java       |  6 ++
 .../hadoop/impl/HadoopSortingTest.java          |  4 +-
 .../impl/HadoopTaskExecutionSelfTest.java       | 10 ++--
 .../hadoop/impl/HadoopTasksV1Test.java          |  2 +-
 .../hadoop/impl/HadoopTasksV2Test.java          |  2 +-
 .../hadoop/impl/HadoopTeraSortTest.java         |  2 +-
 .../hadoop/impl/HadoopV2JobSelfTest.java        |  2 +-
 .../collections/HadoopAbstractMapTest.java      |  6 ++
 .../HadoopExternalTaskExecutionSelfTest.java    |  4 +-
 20 files changed, 189 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d4a29b2..a66f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -52,6 +52,9 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
     /** User name. */
     private String user;
 
+    /** Credentials. */
+    private byte[] credentials;
+
     /**
      * Default constructor required by {@link Externalizable}.
      */
@@ -69,12 +72,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
      * @param props All other properties of the job.
      */
     public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
-        Map<String, String> props) {
+        Map<String, String> props, byte[] credentials) {
         this.jobName = jobName;
         this.user = user;
         this.hasCombiner = hasCombiner;
         this.numReduces = numReduces;
         this.props = props;
+        this.credentials = credentials;
     }
 
     /** {@inheritDoc} */
@@ -127,6 +131,11 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public byte[] credentials() {
+        return credentials;
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, jobName);
         U.writeString(out, user);
@@ -135,6 +144,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
         out.writeInt(numReduces);
 
         IgfsUtils.writeStringMap(out, props);
+
+        U.writeByteArray(out, credentials);
     }
 
     /** {@inheritDoc} */
@@ -146,6 +157,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
         numReduces = in.readInt();
 
         props = IgfsUtils.readStringMap(in);
+
+        credentials = U.readByteArray(in);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 4cc8f80..3dffbc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -69,6 +69,13 @@ public interface HadoopJobInfo {
     String user();
 
     /**
+     * Gets credentials.
+     *
+     * @return Credentials.
+     */
+    byte[] credentials();
+
+    /**
      * Creates new job instance for the given ID.
      * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
      * This method will be called once for the same ID on one node, though it can be called on the same host

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index 767e10a..89c60b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.impl;
 
 import com.google.common.primitives.Longs;
 import com.google.common.primitives.UnsignedBytes;
+import java.io.DataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -27,6 +28,8 @@ import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
@@ -211,10 +214,12 @@ public class HadoopUtils {
      * Creates JobInfo from hadoop configuration.
      *
      * @param cfg Hadoop configuration.
+     * @param credentials Credentials.
      * @return Job info.
      * @throws IgniteCheckedException If failed.
      */
-    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials)
+        throws IgniteCheckedException {
         JobConf jobConf = new JobConf(cfg);
 
         boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
@@ -269,7 +274,8 @@ public class HadoopUtils {
         for (Map.Entry<String, String> entry : jobConf)
             props.put(entry.getKey(), entry.getValue());
 
-        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props,
+            credentials);
     }
 
     /**
@@ -394,4 +400,39 @@ public class HadoopUtils {
 
         return len1 - len2;
     }
+
+    /**
+     * Deserialization of Hadoop Writable object.
+     *
+     * @param writable Writable object to deserialize to.
+     * @param bytes byte array to deserialize.
+     */
+    public static void deserialize(Writable writable, byte[] bytes) throws IOException {
+        DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
+
+        writable.readFields(dataIn);
+
+        dataIn.close();
+    }
+
+    /**
+     * Create UserGroupInformation for specified user and credentials.
+     *
+     * @param user User.
+     * @param credentialsBytes Credentials byte array.
+     */
+    public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException {
+        Credentials credentials = new Credentials();
+
+        HadoopUtils.deserialize(credentials, credentialsBytes);
+
+        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+
+        ugi.addCredentials(credentials);
+
+        if (credentials.numberOfTokens() > 0)
+            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN);
+
+        return ugi;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
index 0b673e9..f48d21d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.util.GridStringBuilder;
@@ -41,27 +42,32 @@ public class HadoopFileSystemCacheUtils {
         return new HadoopLazyConcurrentMap<>(
             new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
                 @Override public FileSystem createValue(FsCacheKey key) throws IOException {
-                    try {
-                        assert key != null;
+                    assert key != null;
 
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
 
-                        String scheme = uri.getScheme();
+                    String scheme = uri.getScheme();
 
-                        // Copy the configuration to avoid altering the external object.
-                        Configuration cfg = new Configuration(key.configuration());
+                    // Copy the configuration to avoid altering the external object.
+                    Configuration cfg = new Configuration(key.configuration());
 
-                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+                    String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
 
-                        cfg.setBoolean(prop, true);
+                    cfg.setBoolean(prop, true);
 
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
+                    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() ==
+                        UserGroupInformation.AuthenticationMethod.TOKEN)
+                        return FileSystem.get(uri, cfg);
+                    else {
+                        try {
+                            return FileSystem.get(uri, cfg, key.user());
+                        }
+                        catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
 
-                        throw new IOException("Failed to create file system due to interrupt.", e);
+                            throw new IOException("Failed to create file system due to interrupt.", e);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index 7fc0e77..811b0c2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -122,8 +123,13 @@ public class HadoopClientProtocol implements ClientProtocol {
         try {
             conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
 
+            byte[] credentials = null;
+
+            if (ts != null)
+                credentials = WritableUtils.toByteArray(ts);
+
             HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
-                jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
+                jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials));
 
             if (status == null)
                 throw new IOException("Failed to submit job (null status obtained): " + jobId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 2a85cb8..28b4d6b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl.v2;
 
+import java.security.PrivilegedExceptionAction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter;
@@ -317,7 +320,7 @@ public class HadoopV2Job extends HadoopJobEx {
     }
 
     /** {@inheritDoc} */
-    @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+    @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException {
         assert locNodeId != null;
 
         this.locNodeId = locNodeId;
@@ -325,15 +328,36 @@ public class HadoopV2Job extends HadoopJobEx {
         ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
 
         try {
-            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+            if (jobInfo.credentials() == null)
+                rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+            else {
+                UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials());
+
+                try {
+                    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                        @Override public Void run() throws Exception {
+                            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId,
+                                jobId));
+
+                            return null;
+                        }
+                    });
+                }
+                catch (IOException | InterruptedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
 
             if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) {
-                U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + " job property is set to true; please disable " +
-                    "it if job tasks rely on mutable static state.");
+                U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() +
+                    " job property is set to true; please disable " + "it if job tasks rely on mutable static state.");
 
                 sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
             }
         }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
         finally {
             HadoopCommonUtils.restoreContextClassLoader(oldLdr);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 6127822..c362b0c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
@@ -548,41 +549,58 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
 
     /** {@inheritDoc} */
     @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
-        String user = job.info().user();
+        if (job.info().credentials() == null) {
+            String user = job.info().user();
 
-        user = IgfsUtils.fixUserName(user);
+            user = IgfsUtils.fixUserName(user);
 
-        assert user != null;
+            assert user != null;
 
-        String ugiUser;
+            String ugiUser;
 
-        try {
-            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+            try {
+                UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
 
-            assert currUser != null;
+                assert currUser != null;
 
-            ugiUser = currUser.getShortUserName();
-        }
-        catch (IOException ioe) {
-            throw new IgniteCheckedException(ioe);
-        }
+                ugiUser = currUser.getShortUserName();
+            }
+            catch (IOException ioe) {
+                throw new IgniteCheckedException(ioe);
+            }
 
-        try {
-            if (F.eq(user, ugiUser))
-                // if current UGI context user is the same, do direct call:
-                return c.call();
-            else {
-                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+            try {
+                if (F.eq(user, ugiUser))
+                    // if current UGI context user is the same, do direct call:
+                    return c.call();
+                else {
+                    UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                    return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                        @Override public T run() throws Exception {
+                            return c.call();
+                        }
+                    });
+                }
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+        else {
+            try {
+                UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials());
 
                 return ugi.doAs(new PrivilegedExceptionAction<T>() {
-                    @Override public T run() throws Exception {
+                    @Override
+                    public T run() throws Exception {
                         return c.call();
                     }
                 });
             }
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException(e);
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 4928e3d..fc6d7f8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -207,7 +207,7 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
index 2de2d19..d27a234 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -127,7 +127,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest {
         }
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
-            createJobInfo(job.getConfiguration())).get(30000);
+            createJobInfo(job.getConfiguration(), null)).get(30000);
 
         assertTrue(HadoopGroupingTestState.values().isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index 381652e..c3b3040 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -121,7 +121,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
 
             HadoopJobId jobId = new HadoopJobId(globalId, 1);
 
-            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
             checkStatus(jobId, false);
 
@@ -168,7 +168,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
 
             HadoopJobId jobId = new HadoopJobId(globalId, 1);
 
-            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
             checkStatus(jobId, false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 6eb16af..21b7ee2 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
             job.setJarByClass(HadoopWordCount2.class);
 
             IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-                    createJobInfo(job.getConfiguration()));
+                    createJobInfo(job.getConfiguration(), null));
 
             fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
index 28c8264..b3368bd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -178,5 +178,11 @@ public class HadoopPlannerMockJob extends HadoopJobEx {
 
             return null;
         }
+
+        @Override public byte[] credentials() {
+            throwUnsupported();
+
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
index 2e85cce..bb11ccb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -117,7 +117,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
         X.printerrln("Data generation started.");
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration())).get(180000);
+            createJobInfo(job.getConfiguration(), null)).get(180000);
 
         X.printerrln("Data generation complete.");
 
@@ -148,7 +148,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest {
         X.printerrln("Job started.");
 
         grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
-            createJobInfo(job.getConfiguration())).get(180000);
+            createJobInfo(job.getConfiguration(), null)).get(180000);
 
         X.printerrln("Job complete.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index c27a67f..2394ada 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -143,7 +143,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-                createJobInfo(job.getConfiguration()));
+                createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 
@@ -188,7 +188,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
 
@@ -226,7 +226,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
         job.setJarByClass(getClass());
 
         final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
-                createJobInfo(job.getConfiguration()));
+                createJobInfo(job.getConfiguration(), null));
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -313,7 +313,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg, null));
 
         if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -364,7 +364,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
         assertFalse(killRes);
 
-        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg, null));
 
         if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
index 1d7f3e4..ca96551 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
@@ -46,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         setupFileSystems(jobConf);
 
-        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf, null);
 
         UUID uuid = new UUID(0, 0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
index 61e3e46..0fcd358 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
@@ -65,7 +65,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
 
-        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration(), null);
 
         UUID uuid = new UUID(0, 0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index d8b74ce..46752a8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -178,7 +178,7 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
 
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
 
         fut.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
index 2c2f049..041f0bc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -80,7 +80,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopDefaultJobInfo info = createJobInfo(cfg);
+        HadoopDefaultJobInfo info = createJobInfo(cfg, null);
 
         final UUID uuid = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index b9dcae1..49be0a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -177,5 +177,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
 
             return null;
         }
+
+        @Override public byte[] credentials() {
+            assert false;
+
+            return null;
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 0afd689..1246078 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -117,7 +117,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
+            createJobInfo(job.getConfiguration(), null));
 
         fut.get();
     }
@@ -153,7 +153,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
         job.setJarByClass(getClass());
 
         IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
+            createJobInfo(job.getConfiguration(), null));
 
         try {
             fut.get();


[2/2] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fa9de3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fa9de3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fa9de3e

Branch: refs/heads/master
Commit: 5fa9de3ef302937e2cea22a2f0f29c957e5c7313
Parents: 438760e 89c82f5
Author: devozerov <vo...@gridgain.com>
Authored: Wed Nov 29 13:50:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Nov 29 13:50:33 2017 +0300

----------------------------------------------------------------------
 .../cache/binary/BinaryMetadataTransport.java   |  20 ++-
 .../continuous/GridContinuousProcessor.java     | 132 ++++++++++++-------
 2 files changed, 93 insertions(+), 59 deletions(-)
----------------------------------------------------------------------